Почему отложенное задание от celery срабатывает через раз?
Я сделал таск, который, по логике, должен был удалять событие с истекшим временем. Но, когда таск проходит 2 раз после предыдущего раза, он вызывает одну и ту же ошибку с закрытием обработчика.
tasks/events.py
import datetime
from src.conf import celery
from src.utils.events import delete_event, get_events_from_db
from src.utils.decorators import async_task
@async_task(celery)
async def delete_overdue_events() -> None:
"""Ця функція видаляє прострочені події"""
events = await get_events_from_db()
for event in events:
if event.get('end_time') < datetime.date.today():
await delete_event(event.get('id'))
Ошибка возникает именно в await get_events_from_db()
, но я использовал эту функцию в других местах и все хорошо работало.
utils/events.py
from typing import List, Dict
from sqlalchemy import select, delete
from src.database import async_session, redis_client
from src.database.models import Event
async def get_events_from_db() -> List[Dict]:
"""Ця функція отримує усі найближчі події з бази даних."""
async with async_session() as session:
query = select(Event)
result = await session.execute(query)
return [item.as_dict() for item in result.scalars().all()]
async def delete_event(event_id: int) -> None:
"""Ця функція відповідає за видалення події з бази даних.
Параметри:
event_id: ідентифікатор події;
"""
async with async_session() as session:
stat = delete(Event).where(Event.id == event_id)
await session.execute(stat)
database/initialize.py
import aioredis
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from src.conf.const import (
DB_HOST, DB_NAME, DB_PASSWORD,
DB_USER
)
DATABASE_URL = f'mysql+aiomysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
engine = create_async_engine(url=DATABASE_URL, pool_recycle=3600)
async_session = async_sessionmaker(engine)
redis_client = aioredis.Redis()
conf/initialize.py
from celery import Celery
from celery.schedules import crontab
from src.conf.const import CELERY_RESULT_BACKEND, CELERY_BROKER_URL
celery = Celery(
'tasks',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND,
include=['src.tasks']
)
celery.conf.beat_schedule = {
'check_schedules': {
'task': 'src.tasks.schedule.check_schedule',
'schedule': crontab(minute='*/35')
},
'check_overdue_events': {
'task': 'src.tasks.events.delete_overdue_events',
'schedule': crontab(minute='*/5')
}
}
utils/decorator.py
from functools import wraps
from celery import Celery, Task
from typing import Any, Callable, Coroutine, ParamSpec, TypeVar
from asgiref import sync
_P = ParamSpec("_P")
_R = TypeVar("_R")
def async_task(app: Celery, *args: Any, **kwargs: Any):
def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
sync_call = sync.AsyncToSync(func)
@app.task(*args, **kwargs)
@wraps(func)
def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
return sync_call(*args, **kwargs)
return _decorated
return _decorator
Этот декоратор я позаимствовал y @Fran.
Traceback
Task src.tasks.events.delete_overdue_events[2313881d-bdd8-4e01-8cb4-692247e7979d] raised unexpected: RuntimeError('unable to perform operation on <TCPTransport closed=True reading=False 0x7417cc00b6a0>; the handler is closed')
Traceback (most recent call last):
File "/home/oleksandr/develop/bot/src/utils/events.py", line 24, in get_events_from_db
result = await session.execute(query)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
result = await greenlet_spawn(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
return self._execute_internal(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2236, in _execute_internal
result: Result[Any] = compile_state_cls.orm_execute_statement(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
result = conn.execute(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
return meth(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
return connection._execute_clauseelement(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
ret = self._execute_context(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
return self._exec_single_context(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
self._handle_dbapi_exception(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2356, in _handle_dbapi_exception
raise exc_info[1].with_traceback(exc_info[2])
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
self.dialect.do_execute(
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute
cursor.execute(statement, parameters)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/aiomysql.py", line 95, in execute
return self.await_(self._execute_async(operation, parameters))
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
value = await result
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/aiomysql.py", line 104, in _execute_async
result = await self._cursor.execute(operation, parameters)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/cursors.py", line 239, in execute
await self._query(query)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/cursors.py", line 457, in _query
await conn.query(q)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 468, in query
await self._execute_command(COMMAND.COM_QUERY, sql)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 724, in _execute_command
self._write_bytes(prelude + sql[:chunk_size - 1])
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 669, in _write_bytes
return self._writer.write(data)
File "/usr/lib/python3.10/asyncio/streams.py", line 325, in write
self._transport.write(data)
File "uvloop/handles/stream.pyx", line 674, in uvloop.loop.UVStream.write
File "uvloop/handles/handle.pyx", line 159, in uvloop.loop.UVHandle._ensure_alive
RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x7417cc00b6a0>; the handler is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/celery/app/trace.py", line 453, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/celery/app/trace.py", line 736, in __protected_call__
return self.run(*args, **kwargs)
File "/home/oleksandr/develop/bot/src/utils/decorators.py", line 48, in _decorated
return sync_call(*args, **kwargs)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/asgiref/sync.py", line 254, in __call__
return call_result.result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/asgiref/sync.py", line 331, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/home/oleksandr/develop/bot/src/tasks/events.py", line 12, in delete_overdue_events
events = await get_events_from_db()
File "/home/oleksandr/develop/bot/src/utils/events.py", line 22, in get_events_from_db
async with async_session() as session:
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 1025, in close
await greenlet_spawn(self.sync_session.close)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2507, in close
self._close_impl(invalidate=False)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2576, in _close_impl
transaction.close(invalidate)
File "<string>", line 2, in close
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
ret_value = fn(self, *arg, **kw)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1408, in close
transaction.close()
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2586, in close
self._do_close()
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2724, in _do_close
self._close_impl()
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2710, in _close_impl
self._connection_rollback_impl()
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2702, in _connection_rollback_impl
self.connection._rollback_impl()
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1129, in _rollback_impl
self._handle_dbapi_exception(e, None, None, None, None)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2356, in _handle_dbapi_exception
raise exc_info[1].with_traceback(exc_info[2])
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1127, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 698, in do_rollback
dbapi_connection.rollback()
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/aiomysql.py", line 201, in rollback
self.await_(self._connection.rollback())
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
value = await result
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 398, in rollback
await self._execute_command(COMMAND.COM_QUERY, "ROLLBACK")
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 724, in _execute_command
self._write_bytes(prelude + sql[:chunk_size - 1])
File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 669, in _write_bytes
return self._writer.write(data)
File "/usr/lib/python3.10/asyncio/streams.py", line 325, in write
self._transport.write(data)
File "uvloop/handles/stream.pyx", line 674, in uvloop.loop.UVStream.write
File "uvloop/handles/handle.pyx", line 159, in uvloop.loop.UVHandle._ensure_alive
RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x7417cc00b6a0>; the handler is closed
Я понимаю, что проблема где-то в async with async_session() as session:
, но я не понимаю, как ее решить. Может, кто-то сталкивался с этой проблемой и может мне помочь?
Ответы (2 шт):
Первое:
в функции
delete_event
Вы не вызываетеawait session.commit()
При выполнении операцийINSERT
,UPDATE
,DELETE
вам нужно вызватьcommit
,
чтобы эти изменения были зафиксированы.
Это особенно критично при асинхронном коде!
Второе (надо проверять):
Вы используете sync.AsyncToSync
для преобразования асинхронной функции в синхронную. Это может быть проблемой, если ваша функция выполняет асинхронные операции, так как она может блокировать поток.
А у Вас в функции delete_overdue_events()
есть await
(может обернуть в task?)!
Убедитесь, что ваш код не вызывает блокировок.
(Это может иметь место, т.к. Вы заимствовали декоратор!
Ему надо уделить больше внимания.)
Третье (Проблема архитектуры):
- У Вас есть декоратор для асинхронной функции
delete_overdue_events()
;- в которой есть две точки
await
(вызываются асинхронные функции)- в каждой из вызываемых функций:
get_events_from_db()
,delete_event(event_id: int)
через контекстный менеджер открывается сессия БД
и внутри присутствует точкаawait
При такой архитектуре трудно быть уверенным в том,
что все асинхронные операции будут завершены до закрытия сессии БД.
Попробуйте открывать сессию БД в декораторе, и передавать её как аргумент:
from functools import wraps
from celery import Celery, Task
from typing import Any, Callable, Coroutine, ParamSpec, TypeVar
from asgiref import sync
from sqlalchemy.ext.asyncio import async_sessionmaker
_P = ParamSpec("_P")
_R = TypeVar("_R")
# def async_task(app: Celery, session_factory: async_sessionmaker, *args: Any, **kwargs: Any):
# def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
# sync_call = sync.AsyncToSync(func)
#
# @app.task(*args, **kwargs)
# @wraps(func)
# async def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
# async with session_factory() as session:
# # Передаем сессию в декорируемую функцию
# return await sync_call(*args, session=session, **kwargs)
# return _decorated
# return _decorator
def async_task(app: Celery, session_factory: async_sessionmaker, *args: Any, **kwargs: Any):
def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
@app.task(*args, **kwargs)
@wraps(func)
async def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
async with session_factory() as session:
# Передаем сессию в декорируемую функцию
return await func(*args, session=session, **kwargs)
return _decorated
return _decorator
подстройте функцию delete_overdue_events
:
@async_task(celery, async_session)
async def delete_overdue_events(session: AsyncSession) -> None:
"""Эта функция проверяет дату события и удаляет просроченные."""
events = await get_events_from_db(session)
for event in events:
if event.get('end_time') < datetime.datetime.today():
await delete_event(session, event.get('id'))
и немного измените функции get_events_from_db
и delete_event
async def get_events_from_db(session: AsyncSession) -> List[Dict]:
async with session.begin(): # Используйте begin для управления транзакцией
query = select(Event)
result = await session.execute(query)
return [item.as_dict() for item in result.scalars().all()]
async def delete_event(session: AsyncSession, event_id: int) -> None:
stat = delete(Event).where(Event.id == event_id)
await session.execute(stat)
Обратите внимание, что в delete_event
не требуется await session.commit()
,
т.к. в get_events_from_db
используется begin
для управления транзакцией.
Дополнение.
Эта ошибка:
Task src.tasks.events.delete_overdue_events[86e1a76c-c6ed-4d9a-a7e7-d5882dbf03ea] raised unexpected: EncodeError(TypeError('Object of type coroutine is not JSON serializable'))
возникает со стороны Celery, т.к. ему требуется сериализация объектов для передачи.
Было бы полезно подробнее описать архитектуру проекта, и всё, что касается работы с Celery.
Если кому-то будет полезно, то проблема была в декораторе async_task
, который я использовал для выполнения асинхронных функций внутри таска.
Для решения моей проблемы мне помогло обернуть мою таску в asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
внутри декоратора.
Декоратор:
def sync(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
return wrapper
Моя асинхронная таска:
@shared_task
@sync
async def delete_overdue_events() -> None:
"""Эта функция проверяет дату события и удаляет просроченные."""
events = await get_events_from_db()
for event in events:
if event.get('end_time') < datetime.datetime.today():
await delete_event(event.get('id'))
Ответ я нашел не сам, а с помощью другого ответа на вопрос от @Sergey Kozlovskiy на Stackoverflow, который я нашел здесь.
Надеюсь, кому-то этот ответ поможет.
P.S. Спасибо @AnnaBazueva - SPAM за помощь в решении этой проблемы.