Динамические и фиксированные параметры Airflow

Как Airflow работает с переменными, передающимися в таски?

Есть простой код для примера:

from datetime import datetime, timedelta
import time
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from af_etl_log import *
from af_etl_params import *

AF_NAME = os.path.splitext(os.path.basename(__file__))[0]

default_args = {
    "owner": "airflow",  # владелец поумолчанию
    "start_date": "2020-01-01 00:00:00",  # дата старта работы дага
    "depends_on_past": "False",
    "provide_context": "True",
    "is_paused_upon_creation": "True"
}


def wait_f(datte):
    delay_var = 60 * 2
    print(datte)
    time.sleep(delay_var)


def print_f(datte):
    print(datte)


with DAG(
    dag_id=AF_NAME,catchup=False,
    default_args=default_args,
    on_success_callback=job_success_callback,
    on_failure_callback=job_error_callback,
    is_paused_upon_creation=True,
    schedule_interval="*/3 * * * *",
) as dag:
    datte = datetime.now()

    waitt = PythonOperator(
        task_id='waitt',
        python_callable=lambda **a: wait_f(datte),
        dag=dag
    )
    printt = PythonOperator(
        task_id='printt',
        python_callable=lambda **a: print_f(datte),
        dag=dag
    )
    waitt >> printt

Какое ожидается поведение дага:

  1. при каждом запуске инстанса дага высчитывается значение переменной datte. И фиксируется.
  2. запускается первый таск, который выводит значение переменной datte и ждет целые сутки.
  3. запускается второй таск, который выводит значение переменной datte. И оно должно быть точно такое же, к в первом таске.

Вопросы:

  1. Если я правильно понимаю, то даг постоянно пересоздается по указанному в py-скрипте коду. Приведет ли это к тому, что переменная datte для разных тасков будет разной (одна будет указывать на "вчера", другая - на "завтра")?
  2. Как создать переменную, которая будет обновляться для каждого инстанса (запуска) дага?
  3. Как создать переменную, которая рассчитается один раз и зафиксируется. И больше не будет изменяться в процессе работы дага?

Ответы (0 шт):