airflow условие
Мне нужно добавить условие, если в базе данных у меня нет нужных значений то мы скипаем последующие шаги. Как это реализовать в airflow? Шаги можно скипать по результатам прошлых шагов например так
finally_ = PythonOperator(
task_id="finally",
python_callable=_finally,
trigger_rule=TriggerRule.ONE_FAILED,
provide_context=True,
op_kwargs={'dag_id': dag.dag_id}
)
Но как сделать что бы это работала с условием if например
if df_main.empty:
пропустить шаг
Ответы (1 шт):
Автор решения: Peter383
→ Ссылка
Это делается при помощи BranchPythonOperator. Вот пример. Функция _isExists_post - пропсывает логику там условие, возвращает название тасков, которые отрабатывают в зависимости от наличия данных.
# в зависимости от того, есть ли комментарии, выбирается задача
# в случае, если комментарии есть - загружаем, если нет - пустая задача
def _isExists_posts(id_school, **context):
count_posts = context["task_instance"].xcom_pull(task_ids=f"extract_posts_{id_school}",
key=f"count_vk_posts_load_{id_school}")
if int(count_posts) > 0:
return f"load_posts_{id_school}"
else:
return f"remove_sql_{id_school}"
# ветвление (если постов нет, то далее - пустая задача)
pick_branch_posts = BranchPythonOperator(
task_id=f"isExistsPosts_{school_id}",
python_callable=_isExists_posts,
do_xcom_push=False,
op_kwargs={
"id_school": school_id
},
dag=dag_vk
)