Как добавить новую библиотеку в Airflow?
только начинаю изучать Airflow, возник вопрос. Поставил airflow с docker-compose при написании dag'a использую библиотеку pandas-datareader. Вылезает такая ошибка:
Broken DAG: [/opt/airflow/dags/full_data_dag.py] Traceback (most recent call last):
File "/opt/airflow/dags/full_data_dag.py", line 5, in <module>
from project import get_full_data
File "/opt/airflow/dags/project/get_full_data.py", line 6, in <module>
import pandas_datareader as web
ModuleNotFoundError: No module named 'pandas_datareader'
Код самого dag'а:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
from project import get_full_data
from project import preprocessing
from project import model_train
dag = DAG('full_process', description='Loading full data, train model',
schedule_interval='@once',
start_date=datetime(2021, 11, 19), catchup=False)
def get_data():
get_full_data
get_data = PythonVirtualenvOperator(
task_id='get_data',
python_callable=get_data,
requirements=['pandas_datareader'],
dag=dag)
def make_preprocessing():
preprocessing
preprocess = PythonVirtualenvOperator(
task_id='preprocessing',
python_callable=make_preprocessing,
requirements=['pandas, numpy, sklearn, joblib'],
dag=dag)
def train():
model_train
training = PythonVirtualenvOperator(
task_id='model_training',
python_callable=train,
requirements=['numpy, sklearn, joblib'],
dag=dag)
get_data >> preprocess >> training
Оператор get_data выполняет python файл, в котором используется библиотека pandas_datareader
Подскажите пожалуйста, как я могу добавить новую библиотеку?
Ответы (4 шт):
В системе не хватает зависимости pandas-datareader
Для установки введите в терминал следующую команду:
pip install pandas-datareader
Документация к зависимости:
https://pandas-datareader.readthedocs.io/en/latest/
Можете указать установку зависимости в своём docker файле.
Решил свой вопрос. При использовании кода из другого файла, нужно в этом файле все обернуть в функцию. Напишу пример.
Так работать не будет:
import pandas as pd
pd.read_csv(...)
А так работать будет:
def pandas_read():
import pandas as pd
pd.read_csv(...)
А в самом dag'e делать импорт этой функции из файла
from project.file_name import pandas_read
И в операторе запускать уже функцию pandas_read
А так работать будет:
def pandas_read():
import pandas as pd
pd.read_csv(...)
Будет работать или ошибка будет в логах аирфлоу, а не при импорте дага?
Перенести импорт в тело функции, чтобы не было ошибок при импорте дага.
Чтобы установить библиотеки, надо расширить образ.
Создав Dockerfile с примерно таким содержанием (зависит от версии):
FROM apache/airflow:2.10.2
COPY requirements.txt /requirements.txt
RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r /requirements.txt
Собрать новый образ:
$ docker build . --tag extending_airflow:latest
Указать его в docker-compose:
image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:latest}
Ну и, естественно, пересобрать:
$ docker-compose up