Как ускорить загрузку данных данных в БД Postgres
У меня стоит задача при помощи Airflow перелить данные из БД Oracle в БД Postgres. Структура таблицы идентична. Объем данных очень большой.
Решение ниже рабочее, но скорость работы не очень высокая - 50 млн строк (70 столбцов) переливаются где-то за 1.5 часа. Можно ли как-то ускорить подобный код?
def parallel_copy(df, engine, trg_schema, trg_table):
engine.dispose()
dbapi_conn = engine.connect().connection
csv_buffer = io.StringIO()
with dbapi_conn.cursor() as cur:
df.to_csv(csv_buffer, index=False, header=False)
csv_buffer.seek(0)
columns = ', '.join(['"{}"'.format(k) for k in df.columns.tolist()])
sql = "COPY {} ({}) FROM STDIN WITH CSV".format(f'{trg_schema}.{trg_table}', columns)
cur.copy_expert(sql=sql, file=csv_buffer)
dbapi_conn.commit()
dbapi_conn.close()
def copy_to(engine, trg_schema, trg_table, w_ldate, load_id, w_optype):
# Количество строк
row_count = 0
#Генератор записей из таблицы источника
df_generator = db_hook.get_pandas_df_by_chunks(sql='SELECT * FROM ...', chunksize=1000000)
# Заливка в приемник
for df in df_generator:
df = df.rename(columns=str.lower)
df = df.convert_dtypes()
row_count = row_count + df.shape[0]
# Загрузка в несколько потоков
n_split = 10
processes = [Process(target=parallel_copy, args=(part, engine, trg_schema, trg_table, )) for part in np.array_split(df, n_split)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
return row_count