Как ускорить загрузку данных данных в БД Postgres

У меня стоит задача при помощи Airflow перелить данные из БД Oracle в БД Postgres. Структура таблицы идентична. Объем данных очень большой.

Решение ниже рабочее, но скорость работы не очень высокая - 50 млн строк (70 столбцов) переливаются где-то за 1.5 часа. Можно ли как-то ускорить подобный код?

def parallel_copy(df, engine, trg_schema, trg_table):
    engine.dispose()
    dbapi_conn = engine.connect().connection
    csv_buffer = io.StringIO()
    with dbapi_conn.cursor() as cur:
        df.to_csv(csv_buffer, index=False, header=False)
        csv_buffer.seek(0)
        columns = ', '.join(['"{}"'.format(k) for k in df.columns.tolist()])
        sql = "COPY {} ({}) FROM STDIN WITH CSV".format(f'{trg_schema}.{trg_table}', columns)
        cur.copy_expert(sql=sql, file=csv_buffer)
        dbapi_conn.commit()
        dbapi_conn.close()
    
def copy_to(engine, trg_schema, trg_table, w_ldate, load_id, w_optype):
    # Количество строк
  row_count = 0
  
  #Генератор записей из таблицы источника
  df_generator = db_hook.get_pandas_df_by_chunks(sql='SELECT * FROM ...', chunksize=1000000)
  
  # Заливка в приемник
  for df in df_generator:
        df = df.rename(columns=str.lower)
        df = df.convert_dtypes()
        row_count = row_count + df.shape[0]
    
    # Загрузка в несколько потоков
        n_split = 10

        processes = [Process(target=parallel_copy, args=(part, engine, trg_schema, trg_table, )) for part in np.array_split(df, n_split)]
        for proc in processes:
            proc.start()

        for proc in processes:
            proc.join()

    return row_count

Ответы (0 шт):