Как переписать Python код под DAG Airflow?
Помогите, пожалуйста.
У меня есть Python код выполняющий следующее действие в обычном Jupiter notebook:
d = datetime.now().date()
def transform_data(**kwargs):
внутри этой функции осуществляется подключение к PostgreSQL, далее происходит преобразования данных, а затем сохранение результата в витрину MS SQL:
postgres = PostgresHook(postgres_conn_id='connection_1')
conn_1 = postgres.get_conn()
mssql = MsSqlHook(mssql_conn_id='connection_2', schema= 'schema')
conn_2 = mssql.get_conn()
sql = """ преобразования1 """
zapad = pd.read_sql(sql, con=conn_1)
sever = zapad.loc[zapad['column_name']=="name",:].reset_index(drop=True)
sever.to_sql('NAME',con=conn_2,index=False,if_exists='replace')
def get_name_by_status(status,user,plan):sn,su = status+'_by',status+'_by_name' user.columns=[sn, su, 'phone', 'email', 'adress', 'role'] user[sn] = user[sn].apply(lambda x: str(x)) plan = plan.merge(user.loc[:,[sn, su]], how='left', on=sn) return plandef save_tasks(d):sql = f"""select * from events where date_start <='{d}' and date_end >='{d}' """ wt = pd.read_sql(sql, con=conn_1) wt['dt'] = d conn_2.execute(f"delete from EVENTS where dt='{d}'") wt.loc[~wt['column_name_1'] & wt['column_name_2']\ .isin(sever["column_name_3"].unique()),['column_name_4','column_name_5','dt']]\ .to_sql("EVENTS",con=conn_2,index=False,if_exists='append') sql = """ преобразования """ plan = pd.read_sql(sql, con=conn_1) plan = plan.loc[plan['column_name_1'].isin(sever['column_name_2'].unique())] for status in ['planed','inwork','finished']: plan = get_name_by_status(status,user,plan) plan["dt"]=d try: conn_2.execute(f"delete from WORKS where dt='{d}'") except: print('no table WORKS') plan.to_sql("WORKS",con=conn_2,index=False,if_exists='append')
Итого получается 3 функции. В Jupiter notebook все работает без проблем, но как только пытаюсь обернуть этот код в DAG - ошибка на ошибке. Я уже все перепробовал, не понимаю как сделать так, чтобы DAG в Airflow заработал, при этом выполнился весь код, в результате работы которого должны обновиться все таблиц в БД MS SQL - NAME, EVENTS, WORKS. Благодарю за любую помощь и подсказку.
Ответы (1 шт):
Заработало вот так:
from datetime import datetime
import datetime as dt
import pandas as pd
from airflow import DAG
from airflow.utils.db import provide_session
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
d = datetime.now().date()
def get_name_by_status(status,user,plan):
sn,su = status+'_by',status+'_by_name'
user.columns=[sn, su, 'phone', 'email', 'adress', 'role']
user[sn] = user[sn].apply(lambda x: str(x))
plan = plan.merge(user.loc[:,[sn, su]], how='left', on=sn)
return plan
with DAG(
dag_id='dag_name',
default_args = {'owner': 'airflow'},
schedule_interval='30 3,9,15 * * 1-5',
start_date=datetime(2023, 4, 28),
tags=['tag', 'tag', 'tag'],
catchup=False,
) as dag:
def get_and_save_data(**kwargs):
def save_tasks(d,conn_1,conn_2,mssql):
curr = mssql.get_conn().cursor()
sql = f"""select * from events where date_start <='{d}' and date_end >='{d}'
"""
wt = pd.read_sql(sql, con=conn_1)
wt['dt'] = d
curr.execute(f"delete from EVENTS where dt='{d}'")
wt.loc[~wt['column_name_1'] & wt['column_name_2']\
.isin(sever["column_name_3"].unique()),
['column_name_4','column_name_5','dt']]\
.to_sql("EVENTS",con=conn_2,index=False,if_exists='append')
sql = """ преобразования """
plan = plan.loc[plan['column_name_1'].isin(sever['column_name_2'].unique())]
plan = pd.read_sql(sql, con=conn_1)
if plan.shape[0]>0:
"""различные условия"""
for status in ['planed','inwork','finished']:
plan = get_name_by_status(status,user,plan)
plan["dt"]=d
"""различные условия"""
try:
curr.execute(f"delete from WORKS where dt='{d}'")
except:
print('no table WORKS')
plan.to_sql("WORKS",con=conn_2,index=False,if_exists='append')
postgres = PostgresHook(postgres_conn_id='connection_1')
conn_1 = postgres.get_conn()
mssql = MsSqlHook(mssql_conn_id='connection_2', schema= 'schema')
conn_2 = get_conn(mssql,"mssql+pymssql")
sql = """ преобразования1 """
zapad = pd.read_sql(sql, con=conn_1)
sever = zapad.loc[zapad['column_name']=="name",:].reset_index(drop=True)
sever.to_sql('NAME',con=conn_2,index=False,if_exists='replace')
start = d +dt.timedelta(days=-3)
stop = d
while start<=stop:
save_tasks(start,conn_1,conn_2,mssql)
start = start + dt.timedelta(days=1)
t_save_tasks = PythonOperator(
task_id='save_tasks',
python_callable= get_and_save_data,
dag=dag
)
t_save_tasks
Одну функцию вынес за DAG, две другие функции вложил одна в другую. 2 недели ушло на то)