Сохранить типы данных колонок из исходной БД в dataframe pandas

Перекладываю данные из одной БД (postgresql) в другую (greenplum) с использованием датафрейма pandas, как временного хранилища данных в реалтайме. Столкнулся с тем, что при вставке данных в целевую БД через df.to_sql() возникла ошибка несовпадения типов:

psycopg2.errors.DatatypeMismatch column "created_at" is of type bigint but expression is of type text

Вывел типы данных датафрейма через dtypes и оказалось, что все колонки кроме одной стали просто object.

С помощью данного обсуждения удалось сделать рабочий код с явным указанием ключей для to_sql в dtype:

dtypes = {
    "product_id": sqlalchemy.types.BIGINT(),
    "id": sqlalchemy.types.VARCHAR(length=256),
    "created_at": sqlalchemy.types.BIGINT()
    # и другие колонки
}

df.to_sql(
    table,
    db_engine,
    if_exists='replace',
    index=False,
    schema=TMP_SCHEMA,
    method='multi',
    chunksize=CHUNKSIZE,
    dtype=dtypes
)

Однако такой способ не подходит для дальнейшей работы с ещё несколькими таблицами - хотелось бы автоматически подставлять типы данных исходных колонок.

В указанном ответе приведен пример маппинга первой строки csv. В моей же БД могут попадаться NULL, что помешает определить тип данных. Есть ли способ решить вопрос с сохранением исходных типов данных без необходимости для каждой таблицы вручную их указывать?


Ответы (1 шт):

Автор решения: mrgervant

По наводке @strawdog - данные о типах колонок в PostgreSQL можно получить из системной таблицы information_schema.columns. Для этого имеются колонки data_type и udt_name - мне удобны данные из udt_name:

sql_text = f'''
            select column_name, udt_name
            from information_schema.columns
            where table_name = '{table}';
            '''
with src_engine.connect() as conn:
    df = pd.read_sql(sqlalchemy.text(sql_text), conn)
column_types = dict(zip(df.column_name, df.udt_name))

В результате получаем словарь column_types, где ключами будут названия колонок, а значениями их типы. Используем полученный словарь для замены строк с названиями типов данных на классы из SQLAlchemy:

def mapping_types(column_types_dict):
    dtypedict = {}
    for key, value in column_types_dict.items():
        if 'int' in value:
            dtypedict[key] = sqlalchemy.types.BIGINT()
        if 'varchar' in value:
            dtypedict[key] = sqlalchemy.types.VARCHAR(length=256)
        # и другие требуемые типы данных
    return dtypedict

Новый словарь можем передавать в параметр dtype функции df.to_sql(), которую используем для отправки данных в другую БД - pandas укажет переданные типы данных.

→ Ссылка