Ошибка asyncio при работе с БД: Future attached to a different loop
Именно при работе с БД, когда я запускаю asyncio.run в консоли пода или в самом приложении — первый раз запускается и отрабатывает нормально, на второй раз возникает эта ошибка:
RuntimeError: Task <Task pending name='Task-7' coro=<test_db_tst() running at /app/test_asyncio_error.py:37> cb=[_run_until_complete_cb() at /usr/local/lib/python3.11/a
syncio/base_events.py:181]> got Future <Future pending cb=[BaseProtocol._on_waiter_completed()]> attached to a different loop
на третий и последующие разы возникает ошибка БД:
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: anot
her operation is in progress
[SQL: SELECT 1].
Подключение к БД абсолютно стандартное, никогда такого ранее не было:
class Database:
def __init__(self, host: str):
try:
self.url = host.split('@')[1]
except IndexError:
self.url = host
self._engine = create_async_engine(host, echo=settings.DEBUG)
self._session_factory = async_sessionmaker(bind=self._engine, expire_on_commit=False)
@property
def engine(self) -> AsyncEngine:
return self._engine
@property
def session_factory(self) -> async_sessionmaker[AsyncSession]:
return self._session_factory
db_tst = Database(settings.db_url)
async def test_db_tst():
"""Просто коннект работает, а вот execute уже рейзит ошибку"""
_db_session = db_tst.session_factory()
async with _db_session as session:
await session.execute(text("SELECT 1"))
await session.close()
Возможно конечно сам asyncio.run
не применим к БД. Если так - подскажите тогда, как можно корректно запускать celery
задачу, она у меня через celery-beat
запускается каждые 15 минут, и после первого запуска всегда такая ошибка т.к. внутри задачи есть взаимодействие с БД:
# tasks.py:
@app.task
def send_notifications_task():
asyncio.run(send_notifications_batch())
# app.py:
@signals.worker_process_init.connect
def setup_periodic_tasks(sender, **kwargs):
asyncio.set_event_loop(asyncio.new_event_loop())
Хотя у меня в соседнем проекте всё абсолютно также сделано — этой ошибки нет
Кстати, если делаю db.engine.dispose()
- возникает такая же ошибка, но после этого уже можно нормально запустить функцию, но только один раз, дальше всё как обычно
Ответы (1 шт):
Добавил вот такой функционал взаимодействия с БД:
@asynccontextmanager
async def runtime_engine(db_url: str = settings.db_url):
engine = create_async_engine(db_url, pool_pre_ping=True)
try:
yield engine
finally:
await engine.dispose()
def make_session_factory(engine) -> async_sessionmaker[AsyncSession]:
return async_sessionmaker(engine, expire_on_commit=False)
Надеюсь подводных камней не будет