Ошибка asyncio при работе с БД: Future attached to a different loop

Именно при работе с БД, когда я запускаю asyncio.run в консоли пода или в самом приложении — первый раз запускается и отрабатывает нормально, на второй раз возникает эта ошибка:

    RuntimeError: Task <Task pending name='Task-7' coro=<test_db_tst() running at /app/test_asyncio_error.py:37> cb=[_run_until_complete_cb() at /usr/local/lib/python3.11/a
    syncio/base_events.py:181]> got Future <Future pending cb=[BaseProtocol._on_waiter_completed()]> attached to a different loop

на третий и последующие разы возникает ошибка БД:

sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: anot
her operation is in progress
[SQL: SELECT 1].

Подключение к БД абсолютно стандартное, никогда такого ранее не было:

    class Database:
        def __init__(self, host: str):
            try:
                self.url = host.split('@')[1]
            except IndexError:
                self.url = host
            self._engine = create_async_engine(host, echo=settings.DEBUG)
            self._session_factory = async_sessionmaker(bind=self._engine, expire_on_commit=False)
    
        @property
        def engine(self) -> AsyncEngine:
            return self._engine
    
        @property
        def session_factory(self) -> async_sessionmaker[AsyncSession]:
            return self._session_factory
    
    
    db_tst = Database(settings.db_url)
    
    async def test_db_tst():
        """Просто коннект работает, а вот execute уже рейзит ошибку"""
        _db_session = db_tst.session_factory()
        async with _db_session as session:
            await session.execute(text("SELECT 1"))
            await session.close()

Возможно конечно сам asyncio.run не применим к БД. Если так - подскажите тогда, как можно корректно запускать celery задачу, она у меня через celery-beat запускается каждые 15 минут, и после первого запуска всегда такая ошибка т.к. внутри задачи есть взаимодействие с БД:

# tasks.py:

@app.task
def send_notifications_task():
    asyncio.run(send_notifications_batch())

# app.py:
@signals.worker_process_init.connect
def setup_periodic_tasks(sender, **kwargs):
    asyncio.set_event_loop(asyncio.new_event_loop())


Хотя у меня в соседнем проекте всё абсолютно также сделано — этой ошибки нет


Кстати, если делаю db.engine.dispose() - возникает такая же ошибка, но после этого уже можно нормально запустить функцию, но только один раз, дальше всё как обычно


Ответы (1 шт):

Автор решения: hhhscvx

Добавил вот такой функционал взаимодействия с БД:

@asynccontextmanager
async def runtime_engine(db_url: str = settings.db_url):
    engine = create_async_engine(db_url, pool_pre_ping=True)
    try:
        yield engine
    finally:
        await engine.dispose()


def make_session_factory(engine) -> async_sessionmaker[AsyncSession]:
    return async_sessionmaker(engine, expire_on_commit=False)

Надеюсь подводных камней не будет

→ Ссылка