Как через SQLAlchemy обновить значения в таблице postgres значениями вычисленными в коде python

У меня есть данные в таблице doc которая хранится в БД postgres и некий алгоритм обработки написанный на python. При помощи SQLAlchemy ORM я извлекаю нужные мне данные из этой таблицы, делаю свои расчеты и в результате у меня получается pandas с 2-мя столбцами - id (это id-шники строк из таблицы doc) и result(полученные алгоритмом значения).

Далее мне нужно вставить полученные результаты (данные из result) в таблицу doc (в столбец result_algo, который уже создан в таблице, но пока все значения в нем null), в соответствии с id (т.е. id результата в пандасе, должен соответствовать id в таблице doc) но я не понимаю как это сделать...

Попробовал поискать в документации, посмотреть в инете, но так и не понял, как это сделать. Была идея, создать скопировать pandas в отдельную таблицу в postgres (через to_sql()), потом сделать inner join по id, и удалить ненужную таблицу. Но этот вариант мне не очень нравится, есть ли возможность не создавая новые таблицы вставить данные в столбец уже существующей таблицы? (Используя SQLAlchemy ORM)


Ответы (2 шт):

Автор решения: user453575457

SQLALchemy умеет сохранять только данные (строки) которые были вытянуты из бд самой SQLAlchchemy. И теоритически вам необходимо вытянуть данные с помощью SQLAlchemy, каждую запись вытащенную таким образом обновить (изменить в этой записи значения) и потом сохранять.

для SQLAlchemy будет примерно так:

with engine.connect() as conn:
     results = conn.execute(stmt).fetchall()
     for db_data in results:
        new_data = df.filter(id=db_data.id)// в пандас выбираем нужную строку

       db_data.result_algo = new_data.result_algo

       conn.commit() // хотя это можно опустить, оно автоматически закоммитится при завершении подключения

Но более верная альтернатива будет использование для обработки данных pyspark

from pyspark.sql import SparkSession


url = "jdbc:sqlserver://<hostname>:<port>;database=<database_name>"
properties = {
    "user": "<username>",
    "password": "<password>",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
// Чтение таблицы из базы
table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

//А здесь мы меняете df как хотите, по типу pandas
@udf
def to_upper(s):
  if s is not None:
   return s.upper()

df.withColumn("result_algo", to_upper("another_filed"))


//Запись в базу
mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)
→ Ссылка
Автор решения: oocmd

Вот такой вариант у меня сработал

for index, row in res_df.iterrows():
        session.connection().execute(update(Doc).where(Doc.id == row["id"]), {"result_algo": row["result"]})
session.commit()
→ Ссылка