airflow условие

Мне нужно добавить условие, если в базе данных у меня нет нужных значений то мы скипаем последующие шаги. Как это реализовать в airflow? Шаги можно скипать по результатам прошлых шагов например так

finally_ = PythonOperator(
    task_id="finally",
    python_callable=_finally,
    trigger_rule=TriggerRule.ONE_FAILED,
    provide_context=True,
    op_kwargs={'dag_id': dag.dag_id}

)

Но как сделать что бы это работала с условием if например

if df_main.empty:
   пропустить шаг

Ответы (1 шт):

Автор решения: Peter383

Это делается при помощи BranchPythonOperator. Вот пример. Функция _isExists_post - пропсывает логику там условие, возвращает название тасков, которые отрабатывают в зависимости от наличия данных.

# в зависимости от того, есть ли комментарии, выбирается задача
# в случае, если комментарии есть - загружаем, если нет - пустая задача
def _isExists_posts(id_school, **context):
    count_posts = context["task_instance"].xcom_pull(task_ids=f"extract_posts_{id_school}",
                                                     key=f"count_vk_posts_load_{id_school}")
    if int(count_posts) > 0:
        return f"load_posts_{id_school}"
    else:
        return f"remove_sql_{id_school}"

    # ветвление (если постов нет, то далее - пустая задача)
    pick_branch_posts = BranchPythonOperator(
        task_id=f"isExistsPosts_{school_id}",
        python_callable=_isExists_posts,
        do_xcom_push=False,
        op_kwargs={
            "id_school": school_id
        },

        dag=dag_vk

    )
→ Ссылка