HDFS and AirFlow interaction. PythonVirtualnvOperator FileNotFoundError: No such file or directory: 'hadoop'
Я пытаюсь подружить AirFlow и HDFS, так, чтобы данные забирались из папки, лежащей на hdfs(features_path), процессились скриптом и складывались в другую папку на hdfs (processed_features_path).
Работаю в облаке, в котором развернут инстанс airflow. Многих библиотек в инстансе нет, потому приходится использовать PythonVirtualenvOperator. Библиотека virtualenv установлена. Изначально даг падал с ошибкой: no module fsspec, далее no module pyarrow, теперь падает с : FileNotFoundError: [Errno 2] No such file or directory: 'hadoop'.
По мере появления ошибок добавлял библиотеки в requirements(fsspec,pyarrow,hdfs), но с последней ошибкой уже без понятия что делать.
Прошу помочь!
#imports
from datetime import timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonVirtualenvOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
#configuration
DAG_NAME = "name"
args = {'owner': 'airflow',
'depends_on_past': False,
'email': ["[email protected]"],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'queue': 'airflow'}
#tasks
def process_features():
#imports
import os
import subprocess
import pandas as pd
#paths
features_path = 'hdfs://some_hdfs_path_1'
processed_features_path = 'hdfs://some_hdfs_path_2'
fillna_path = 'hdfs://some_hdfs_path_3'
final_features_path = 'hdfs://some_hdfs_path_4'
#read
df = pd.read_pickle(features_path)
features_list = pd.read_excel(final_features_path)
scorer_fillna_value = pd.read_excel(fillna_path).to_dict()
#calculate
df_model = df[features_list].fillna(scorer_fillna_value)
df_id = df[['id_col_1','id_col_2']]
#save
df_model.to_excel(os.path.join(processed_features_path,'filename_1.xlsx'))
df_id.to_excel(os.path.join(processed_features_path,'filename_2.xlsx'))
#dag
with DAG(
DAG_NAME,
default_args=args,
schedule_interval='30 23 * * *',
start_date=days_ago(1),
) as dag:
start = DummyOperator(
task_id='start',
dag=dag
)
task_process_features = PythonVirtualenvOperator(
task_id = 'process_features',
python_callable = process_features,
requirements = ['pandas==1.3.0','pyarrow==2.0.0','hdfs','fsspec'],
python_version = '3.8.7',
provide_context=True,
system_site_packages=False,
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
start >> task_process_features >> end