Как добавить новую библиотеку в Airflow?

только начинаю изучать Airflow, возник вопрос. Поставил airflow с docker-compose при написании dag'a использую библиотеку pandas-datareader. Вылезает такая ошибка:

Broken DAG: [/opt/airflow/dags/full_data_dag.py] Traceback (most recent call last):
  File "/opt/airflow/dags/full_data_dag.py", line 5, in <module>
    from project import get_full_data
  File "/opt/airflow/dags/project/get_full_data.py", line 6, in <module>
    import pandas_datareader as web
ModuleNotFoundError: No module named 'pandas_datareader'

Код самого dag'а:

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
from project import get_full_data
from project import preprocessing
from project import model_train

dag = DAG('full_process', description='Loading full data, train model',
          schedule_interval='@once',
          start_date=datetime(2021, 11, 19), catchup=False)

def get_data():
    get_full_data

get_data = PythonVirtualenvOperator(
    task_id='get_data',
    python_callable=get_data,
    requirements=['pandas_datareader'],
    dag=dag)

def make_preprocessing():
    preprocessing

preprocess = PythonVirtualenvOperator(
    task_id='preprocessing',
    python_callable=make_preprocessing,
    requirements=['pandas, numpy, sklearn, joblib'],
    dag=dag)

def train():
    model_train

training = PythonVirtualenvOperator(
    task_id='model_training',
    python_callable=train,
    requirements=['numpy, sklearn, joblib'],
    dag=dag)

get_data >> preprocess >> training

Оператор get_data выполняет python файл, в котором используется библиотека pandas_datareader

Подскажите пожалуйста, как я могу добавить новую библиотеку?


Ответы (4 шт):

Автор решения: AlTheOne

В системе не хватает зависимости pandas-datareader

Для установки введите в терминал следующую команду:

pip install pandas-datareader

Документация к зависимости:
https://pandas-datareader.readthedocs.io/en/latest/

Можете указать установку зависимости в своём docker файле.

→ Ссылка
Автор решения: Adventer

Решил свой вопрос. При использовании кода из другого файла, нужно в этом файле все обернуть в функцию. Напишу пример.

Так работать не будет:

import pandas as pd
pd.read_csv(...)

А так работать будет:

def pandas_read():
    import pandas as pd
    pd.read_csv(...)

А в самом dag'e делать импорт этой функции из файла

from project.file_name import pandas_read

И в операторе запускать уже функцию pandas_read

→ Ссылка
Автор решения: Илья Андерсон

А так работать будет:

def pandas_read():
    import pandas as pd
    pd.read_csv(...)

Будет работать или ошибка будет в логах аирфлоу, а не при импорте дага?

→ Ссылка
Автор решения: Илья Андерсон

Перенести импорт в тело функции, чтобы не было ошибок при импорте дага.

Чтобы установить библиотеки, надо расширить образ. Создав Dockerfile с примерно таким содержанием (зависит от версии):

FROM apache/airflow:2.10.2
COPY requirements.txt /requirements.txt
RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r /requirements.txt

Собрать новый образ:

$ docker build . --tag extending_airflow:latest 

Указать его в docker-compose:

image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:latest}

Ну и, естественно, пересобрать:

$ docker-compose up
→ Ссылка