Как сделать update таблицы в БД массивом?
Подскажите пожалуйста, как можно сделать update таблицы используя массивы pd.DataFrame?
Сохранился фрагмент старой функции которая сравнивала значения таблицы из БД и таблицы которая приходит по API:
with self.connection.cursor() as cursor:
# Values - значения датафрейма полученные по API.
for i in values:
cursor.execute(f"""SELECT *
FROM {schema}.{tname}
WHERE article='{i[1]}';""")
row = cursor.fetchall()
# Проверка на идентичность. Проверялись только необходимые столбцы.
if not functools.reduce(lambda x, y: x and y,
map(lambda p, q: p == q,
(row[0][:3] + row[0][4:]),
tuple(i[:3]) + tuple(i[4:])), True):
# Если имелись расхождения, то циклом проверялось идентичность каждой ячейки и если значение отличалось, заменялось.
for cell, new_cell, column_name in \
zip(row[0][:3] + row[0][4:], tuple(i[:3]) + tuple(i[4:]),
tuple(column_name_list[:3] + column_name_list[4:])):
if cell != new_cell:
cursor.execute(f"""UPDATE {schema}.{tname}
SET {column_name[0]}='{new_cell}'
WHERE article='{i[1]}'""")
Такой способ обновления данных крайне неэффективен, особенно когда дело доходит до таблиц с большим количеством записей. Помимо этого нужно подобную логику адаптировать под каждую таблицу, что я считаю некорректно, наверняка можно сделать унифицировано.
Сейчас я пытаюсь реализовать следующую функцию:
def update_table(self, dataframe: pd.DataFrame, dataframe_db: pd.DataFrame, tname: str, schema: str = None, pk: list or str = "*"):
dataframe_comp = dataframe.compare(dataframe_db)
# UPDATE ACTION HERE
Но в голову приходит только пройти циклом по датафрейму сравнения, что в целом, то же самое что было ранее, но уже можно использовать для разных таблиц. Подскажите пожалуйста, какие есть инструменты в psycopg2, SQLAlchemy которые смогут обновить данные, возможно без цикла, либо, если циклом, то сделали бы это более эффективно.
Ответы (1 шт):
Попробую набросать логику без SQLAlchemy
Подготовим тестовую схему
create table public.my_test_data
(
id integer not null
primary key,
name varchar,
some_property varchar,
price numeric default 0 not null
);
create table public.test_data_price_history
(
item_id integer
references public.my_test_data
-- Так себе отсылка, но пришлось как-то обыгрывать ваше желание удалять записи
-- при условии сохранения истории цены
on update cascade on delete set null,
old_value numeric,
new_value numeric,
datetime timestamp default (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'::text)
);
create function price_history_update() returns trigger
as
$$
begin
if tg_op = 'INSERT' then
insert into test_data_price_history (item_id, new_value) values (new.id, new.price);
elseif tg_op = 'UPDATE' then
insert into test_data_price_history (item_id, old_value, new_value) values (new.id, old.price, new.price);
end if;
return new;
end;
$$ language plpgsql;
create trigger price_history_new_item
after insert
on public.my_test_data
for each row
execute procedure public.price_history_update();
create trigger price_history_upd_item
after update
on public.my_test_data
for each row
when (new.price IS DISTINCT FROM old.price)
execute procedure public.price_history_update();
create type incoming_json as
(
id int,
name varchar,
some_property varchar,
price numeric
);
Теперь собственно Python
import pandas as pd
import psycopg2 as pg
df1 = pd.DataFrame(
{
'id': [1, 2, 3, 4, 5],
'name': ['N1', 'N2', 'N3', 'N4', 'N5'],
'some_property': ['A', 'B', 'C', 'D', 'A\'E'],
'price': [45.62, 41.13, 125.04, 16.08, 11.16]
}
)
def upsert(df: pd.DataFrame):
payload = df.to_json(orient='records')
with pg.connect('user=me dbname=me host=192.168.150.1') as conn:
with conn.cursor() as cur:
try:
cur.execute(
"""
with recursive data as (select * from jsonb_populate_recordset(null::incoming_json, %s::jsonb)),
_ as (
insert into my_test_data select * from data on conflict (id)
do update set name = excluded.name,
some_property = excluded.some_property,
price = excluded.price)
delete
from my_test_data mtd
where not exists(select 1
from data d
where d.id = mtd.id)
""",
[payload]
)
except Exception:
cur.connection.rollback()
raise
else:
cur.connection.commit()
upsert(df1)
my_test_data
| id | name | some_property | price |
|---|---|---|---|
| 1 | N1 | A | 45.62 |
| 2 | N2 | B | 41.13 |
| 3 | N3 | C | 125.04 |
| 4 | N4 | D | 16.08 |
| 5 | N5 | A'E | 11.16 |
test_data_price_history
| item_id | old_value | new_value | datetime |
|---|---|---|---|
| 1 | null | 45.62 | 2023-05-16 23:52:31.508051 |
| 2 | null | 41.13 | 2023-05-16 23:52:31.508051 |
| 3 | null | 125.04 | 2023-05-16 23:52:31.508051 |
| 4 | null | 16.08 | 2023-05-16 23:52:31.508051 |
| 5 | null | 11.16 | 2023-05-16 23:52:31.508051 |
df2 = pd.DataFrame(
{
'id': [1, 2, 3, 4, 6],
'name': ['N1', 'N2', 'N3', 'N4', 'N6'],
'some_property': ['A', 'B', 'C', 'D', 'B\'E'],
'price': [45.62, 41.13, 113.03, 16.08, 88.0]
}
)
upsert(df2)
my_test_data
| id | name | some_property | price |
|---|---|---|---|
| 1 | N1 | A | 45.62 |
| 2 | N2 | B | 41.13 |
| 3 | N3 | C | 113.03 |
| 4 | N4 | D | 16.08 |
| 6 | N6 | B'E | 88 |
test_data_price_history
| item_id | old_value | new_value | datetime |
|---|---|---|---|
| 1 | null | 45.62 | 2023-05-16 23:52:31.508051 |
| 2 | null | 41.13 | 2023-05-16 23:52:31.508051 |
| 3 | null | 125.04 | 2023-05-16 23:52:31.508051 |
| 4 | null | 16.08 | 2023-05-16 23:52:31.508051 |
| null | null | 11.16 | 2023-05-16 23:52:31.508051 |
| 3 | 125.04 | 113.03 | 2023-05-16 23:55:09.474256 |
| 6 | null | 88 | 2023-05-16 23:55:09.474256 |