HDFS and AirFlow interaction. PythonVirtualnvOperator FileNotFoundError: No such file or directory: 'hadoop'

Я пытаюсь подружить AirFlow и HDFS, так, чтобы данные забирались из папки, лежащей на hdfs(features_path), процессились скриптом и складывались в другую папку на hdfs (processed_features_path).

Работаю в облаке, в котором развернут инстанс airflow. Многих библиотек в инстансе нет, потому приходится использовать PythonVirtualenvOperator. Библиотека virtualenv установлена. Изначально даг падал с ошибкой: no module fsspec, далее no module pyarrow, теперь падает с : FileNotFoundError: [Errno 2] No such file or directory: 'hadoop'.

По мере появления ошибок добавлял библиотеки в requirements(fsspec,pyarrow,hdfs), но с последней ошибкой уже без понятия что делать.

Прошу помочь!

#imports
from datetime import timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonVirtualenvOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

#configuration
DAG_NAME = "name"

args = {'owner': 'airflow',
        'depends_on_past': False,
        'email': ["[email protected]"],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
        'queue': 'airflow'}

#tasks
def process_features():

    #imports
    import os
    import subprocess
    import pandas as pd

    #paths
    features_path = 'hdfs://some_hdfs_path_1'
    processed_features_path = 'hdfs://some_hdfs_path_2'
    fillna_path   = 'hdfs://some_hdfs_path_3'
    final_features_path = 'hdfs://some_hdfs_path_4'

    #read
    df = pd.read_pickle(features_path)
    features_list = pd.read_excel(final_features_path)
    scorer_fillna_value = pd.read_excel(fillna_path).to_dict()

    #calculate
    df_model = df[features_list].fillna(scorer_fillna_value)
    df_id = df[['id_col_1','id_col_2']]

    #save
    df_model.to_excel(os.path.join(processed_features_path,'filename_1.xlsx'))
    df_id.to_excel(os.path.join(processed_features_path,'filename_2.xlsx'))


#dag 
with DAG(
    DAG_NAME,
    default_args=args,
    schedule_interval='30 23 * * *',
    start_date=days_ago(1),
) as dag:

    start = DummyOperator(
        task_id='start',
        dag=dag
    )

    task_process_features = PythonVirtualenvOperator(
        task_id = 'process_features',
        python_callable = process_features,
        requirements = ['pandas==1.3.0','pyarrow==2.0.0','hdfs','fsspec'],
        python_version = '3.8.7',
        provide_context=True,
        system_site_packages=False,
        dag=dag
        )

    end = DummyOperator(
        task_id='end',
        dag=dag
    )
    
    start >> task_process_features >> end


Ответы (0 шт):