Динамические и фиксированные параметры Airflow
Как Airflow работает с переменными, передающимися в таски?
Есть простой код для примера:
from datetime import datetime, timedelta
import time
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from af_etl_log import *
from af_etl_params import *
AF_NAME = os.path.splitext(os.path.basename(__file__))[0]
default_args = {
"owner": "airflow", # владелец поумолчанию
"start_date": "2020-01-01 00:00:00", # дата старта работы дага
"depends_on_past": "False",
"provide_context": "True",
"is_paused_upon_creation": "True"
}
def wait_f(datte):
delay_var = 60 * 2
print(datte)
time.sleep(delay_var)
def print_f(datte):
print(datte)
with DAG(
dag_id=AF_NAME,catchup=False,
default_args=default_args,
on_success_callback=job_success_callback,
on_failure_callback=job_error_callback,
is_paused_upon_creation=True,
schedule_interval="*/3 * * * *",
) as dag:
datte = datetime.now()
waitt = PythonOperator(
task_id='waitt',
python_callable=lambda **a: wait_f(datte),
dag=dag
)
printt = PythonOperator(
task_id='printt',
python_callable=lambda **a: print_f(datte),
dag=dag
)
waitt >> printt
Какое ожидается поведение дага:
- при каждом запуске инстанса дага высчитывается значение переменной
datte. И фиксируется. - запускается первый таск, который выводит значение переменной
datteи ждет целые сутки. - запускается второй таск, который выводит значение переменной
datte. И оно должно быть точно такое же, к в первом таске.
Вопросы:
- Если я правильно понимаю, то даг постоянно пересоздается по указанному в py-скрипте коду. Приведет ли это к тому, что переменная
datteдля разных тасков будет разной (одна будет указывать на "вчера", другая - на "завтра")? - Как создать переменную, которая будет обновляться для каждого инстанса (запуска) дага?
- Как создать переменную, которая рассчитается один раз и зафиксируется. И больше не будет изменяться в процессе работы дага?