Как сделать SQLAlchemy асинхронный генератор сессий?

Мне нужен метод класса, который будет возвращать сессию. Если случилось исключение, то вызывается "session.rollback()" и "session.close()".

class DatabaseHelper:
    def __init__(
        self,
        url: str,
        echo: bool = False,
    ) -> None:
        self.engine = create_async_engine(
            url=url,
            echo=echo,
        )
        self.session_factory = async_sessionmaker(
            bind=self.engine,
        )

    async def get_session(self):
        # some code
        yield self.session_factory()

Я пытался сделать это сам, но я получаю исключение:

main.py

async def main():
    session = db.get_session()
    group = await session.scalar(select(Group).where(Group.tg_id == 1))

Traceback

Traceback (most recent call last):
  File "C:\Users\Pingvinus\Documents\Projects\telegram-bot-database\telegram_bot_database\test.py", line 63, in <module>
    asyncio.run(main())
  File "C:\Users\Pingvinus\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\Pingvinus\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Pingvinus\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\Pingvinus\Documents\Projects\telegram-bot-database\telegram_bot_database\test.py", line 49, in main
    group1 = await session.scalar(
                   ^^^^^^^^^^^^^^
AttributeError: 'async_generator' object has no attribute 'scalar'

Ответы (1 шт):

Автор решения: sxmrxk

Я бы посоветовал создать контекстный менеджер.

Инициализируем engine и session maker:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    url=db_settings.DSN,
    echo=True,
)

async_session_maker = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False)

И вот пример реализации асинхронного контекстного менеджера:

class SessionContextManager:

    def __init__(self) -> None:
        self.session_factory = async_session_maker
        self.session = None

    async def __aenter__(self) -> None:
        self.session = self.session_factory()

    async def __aexit__(self, *args: object) -> None:
        await self.rollback()

    async def commit(self) -> None:
        await self.session.commit()
        await self.session.close()
        self.session = None

    async def rollback(self) -> None:
        await self.session.rollback()
        await self.session.close()
        self.session = None

И вот так его можно использовать у себя в коде:

# uow можно инициировать прямо внутри функции посредством manager = SessionContextManager
async def create_something(manager: SessionContextManager, some_data: dict):
    async with manager:
        # Если внутри этого блока возникнет исключение блок завершится посредством __aexit__, следовательно сделается rollback
        some_id = await manager.session.execute(
            insert(SomeModel).values(some_data).returning(SomeModel.id)
        )
        # Выполнение метода commit является обязательным для того чтобы явно зафиксировать изменения
        await manager.commit()
    return some_id
→ Ссылка