Как сделать SQLAlchemy асинхронный генератор сессий?
Мне нужен метод класса, который будет возвращать сессию. Если случилось исключение, то вызывается "session.rollback()" и "session.close()".
class DatabaseHelper:
def __init__(
self,
url: str,
echo: bool = False,
) -> None:
self.engine = create_async_engine(
url=url,
echo=echo,
)
self.session_factory = async_sessionmaker(
bind=self.engine,
)
async def get_session(self):
# some code
yield self.session_factory()
Я пытался сделать это сам, но я получаю исключение:
main.py
async def main():
session = db.get_session()
group = await session.scalar(select(Group).where(Group.tg_id == 1))
Traceback
Traceback (most recent call last):
File "C:\Users\Pingvinus\Documents\Projects\telegram-bot-database\telegram_bot_database\test.py", line 63, in <module>
asyncio.run(main())
File "C:\Users\Pingvinus\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\Pingvinus\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Pingvinus\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\Users\Pingvinus\Documents\Projects\telegram-bot-database\telegram_bot_database\test.py", line 49, in main
group1 = await session.scalar(
^^^^^^^^^^^^^^
AttributeError: 'async_generator' object has no attribute 'scalar'
Ответы (1 шт):
Автор решения: sxmrxk
→ Ссылка
Я бы посоветовал создать контекстный менеджер.
Инициализируем engine
и session maker
:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
url=db_settings.DSN,
echo=True,
)
async_session_maker = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False)
И вот пример реализации асинхронного контекстного менеджера:
class SessionContextManager:
def __init__(self) -> None:
self.session_factory = async_session_maker
self.session = None
async def __aenter__(self) -> None:
self.session = self.session_factory()
async def __aexit__(self, *args: object) -> None:
await self.rollback()
async def commit(self) -> None:
await self.session.commit()
await self.session.close()
self.session = None
async def rollback(self) -> None:
await self.session.rollback()
await self.session.close()
self.session = None
И вот так его можно использовать у себя в коде:
# uow можно инициировать прямо внутри функции посредством manager = SessionContextManager
async def create_something(manager: SessionContextManager, some_data: dict):
async with manager:
# Если внутри этого блока возникнет исключение блок завершится посредством __aexit__, следовательно сделается rollback
some_id = await manager.session.execute(
insert(SomeModel).values(some_data).returning(SomeModel.id)
)
# Выполнение метода commit является обязательным для того чтобы явно зафиксировать изменения
await manager.commit()
return some_id