Сохранить типы данных колонок из исходной БД в dataframe pandas
Перекладываю данные из одной БД (postgresql) в другую (greenplum) с использованием датафрейма pandas, как временного хранилища данных в реалтайме. Столкнулся с тем, что при вставке данных в целевую БД через df.to_sql() возникла ошибка несовпадения типов:
psycopg2.errors.DatatypeMismatch column "created_at" is of type bigint but expression is of type text
Вывел типы данных датафрейма через dtypes и оказалось, что все колонки кроме одной стали просто object
.
С помощью данного обсуждения удалось сделать рабочий код с явным указанием ключей для to_sql в dtype:
dtypes = {
"product_id": sqlalchemy.types.BIGINT(),
"id": sqlalchemy.types.VARCHAR(length=256),
"created_at": sqlalchemy.types.BIGINT()
# и другие колонки
}
df.to_sql(
table,
db_engine,
if_exists='replace',
index=False,
schema=TMP_SCHEMA,
method='multi',
chunksize=CHUNKSIZE,
dtype=dtypes
)
Однако такой способ не подходит для дальнейшей работы с ещё несколькими таблицами - хотелось бы автоматически подставлять типы данных исходных колонок.
В указанном ответе приведен пример маппинга первой строки csv. В моей же БД могут попадаться NULL, что помешает определить тип данных. Есть ли способ решить вопрос с сохранением исходных типов данных без необходимости для каждой таблицы вручную их указывать?
Ответы (1 шт):
По наводке @strawdog - данные о типах колонок в PostgreSQL можно получить из системной таблицы information_schema.columns
. Для этого имеются колонки data_type и udt_name - мне удобны данные из udt_name:
sql_text = f'''
select column_name, udt_name
from information_schema.columns
where table_name = '{table}';
'''
with src_engine.connect() as conn:
df = pd.read_sql(sqlalchemy.text(sql_text), conn)
column_types = dict(zip(df.column_name, df.udt_name))
В результате получаем словарь column_types
, где ключами будут названия колонок, а значениями их типы. Используем полученный словарь для замены строк с названиями типов данных на классы из SQLAlchemy:
def mapping_types(column_types_dict):
dtypedict = {}
for key, value in column_types_dict.items():
if 'int' in value:
dtypedict[key] = sqlalchemy.types.BIGINT()
if 'varchar' in value:
dtypedict[key] = sqlalchemy.types.VARCHAR(length=256)
# и другие требуемые типы данных
return dtypedict
Новый словарь можем передавать в параметр dtype
функции df.to_sql(), которую используем для отправки данных в другую БД - pandas укажет переданные типы данных.