Как выполнить хранимую процедуру PostgreSQL в Python?
Для примера создадим такую процедура, в которой есть и commit и rollback операции
create or replace procedure a2(
in a smallint,
out b smallint
)
LANGUAGE plpgsql
AS $$
BEGIN
b:=1;
if (a=1) then
commit;
return;
end if;
if (a=2) then
rollback;
b:=2;
else
b:=3;
end if;
END; $$;
Если выполнить эту процедуру через, средства разработки для баз данных(DataGrip), то она будет выполняться без ошибок.
CALL a2(1::smallint,null::smallint);
CALL a2(2::smallint,null::smallint);
CALL a2(3::smallint,null::smallint);
Но если выполнять туже самую процедуру в Python через psycopg2 или asyncpg, происходят разного рода ошибки. Без ошибок выполняются процедуры в которых нет операции commit и rollback, но мне нужен такой функционал.
ps_connection = psycopg2.connect(
user="postgres",
password="denis",
host="127.0.0.1",
port="5432",
database="testing_xab2"
)
cursor = ps_connection.cursor()
cursor.callproc('a2', [72, 10])
result = cursor.fetchall()
print()
""" Ошибка
psycopg2.errors.UndefinedFunction: function a2(integer, integer) does not exist
LINE 1: SELECT * FROM a2(72,10)
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
"""
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncConnection, AsyncEngine
from sqlalchemy import text
import asyncio
async def main(url, raw_sql):
engine = create_async_engine(url, encoding='utf8')
async with engine.begin() as conn:
cursor = await conn.execute(text(raw_sql))
result = cursor.fetchall()
print(result)
asyncio.run(main(
'postgresql+asyncpg://postgres:denis@localhost:5432/testing_xab2'
, 'CALL a2(2::smallint,null::smallint);'
)
)
""" Ошибка
sqlalchemy.exc.DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.InvalidTransactionTerminationError'>: invalid transaction termination
[SQL:
CALL a2(2::smallint,null::smallint);
]
(Background on this error at: https://sqlalche.me/e/14/dbapi)
"""
Ответы (1 шт):
Автор решения: Денис Кустов
→ Ссылка
Нужно включить авто коммит
Для asyncpg это делается так (Взял пример из https://python-gino.org/docs/en/1.1b2/explanation/sa20.html)
async def main(url, raw_sql):
engine = create_async_engine(
url, encoding='utf8',
# ВОТ ВКЛЮЧАЕМ АВТО КОММИТ
isolation_level="AUTOCOMMIT"
)
async with engine.begin() as conn:
cursor = await conn.execute(text(raw_sql))
result = cursor.fetchall()
print(result)
Для psycopg2 (взял пример из https://github.com/psycopg/psycopg2/issues/857)
def main2():
ps_connection = psycopg2.connect(
# dsn=SQL.URL
user="postgres",
password="denis",
host="127.0.0.1",
port="5432",
database="testing_xab2"
)
# ВОТ ВКЛЮЧАЕМ АВТО КОММИТ
ps_connection.autocommit = True
cursor = ps_connection.cursor()
cursor.execute('CALL a2(2::smallint,null::smallint);')
result = cursor.fetchall()
print(result)
БОНУС для использования в session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
async def main(url, raw_sql):
engine = create_async_engine(url, encoding='utf8', isolation_level="AUTOCOMMIT")
async_session= sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
cursor=await session.execute(raw_sql)
result = cursor.fetchall()
print(result)