Как через SQLAlchemy обновить значения в таблице postgres значениями вычисленными в коде python
У меня есть данные в таблице doc которая хранится в БД postgres и некий алгоритм обработки написанный на python. При помощи SQLAlchemy ORM я извлекаю нужные мне данные из этой таблицы, делаю свои расчеты и в результате у меня получается pandas с 2-мя столбцами - id (это id-шники строк из таблицы doc) и result(полученные алгоритмом значения).
Далее мне нужно вставить полученные результаты (данные из result) в таблицу doc (в столбец result_algo, который уже создан в таблице, но пока все значения в нем null), в соответствии с id (т.е. id результата в пандасе, должен соответствовать id в таблице doc) но я не понимаю как это сделать...
Попробовал поискать в документации, посмотреть в инете, но так и не понял, как это сделать. Была идея, создать скопировать pandas в отдельную таблицу в postgres (через to_sql()), потом сделать inner join по id, и удалить ненужную таблицу. Но этот вариант мне не очень нравится, есть ли возможность не создавая новые таблицы вставить данные в столбец уже существующей таблицы? (Используя SQLAlchemy ORM)
Ответы (2 шт):
SQLALchemy умеет сохранять только данные (строки) которые были вытянуты из бд самой SQLAlchchemy. И теоритически вам необходимо вытянуть данные с помощью SQLAlchemy, каждую запись вытащенную таким образом обновить (изменить в этой записи значения) и потом сохранять.
для SQLAlchemy будет примерно так:
with engine.connect() as conn:
results = conn.execute(stmt).fetchall()
for db_data in results:
new_data = df.filter(id=db_data.id)// в пандас выбираем нужную строку
db_data.result_algo = new_data.result_algo
conn.commit() // хотя это можно опустить, оно автоматически закоммитится при завершении подключения
Но более верная альтернатива будет использование для обработки данных pyspark
from pyspark.sql import SparkSession
url = "jdbc:sqlserver://<hostname>:<port>;database=<database_name>"
properties = {
"user": "<username>",
"password": "<password>",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
// Чтение таблицы из базы
table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)
//А здесь мы меняете df как хотите, по типу pandas
@udf
def to_upper(s):
if s is not None:
return s.upper()
df.withColumn("result_algo", to_upper("another_filed"))
//Запись в базу
mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)
Вот такой вариант у меня сработал
for index, row in res_df.iterrows():
session.connection().execute(update(Doc).where(Doc.id == row["id"]), {"result_algo": row["result"]})
session.commit()