Почему отложенное задание от celery срабатывает через раз?

Я сделал таск, который, по логике, должен был удалять событие с истекшим временем. Но, когда таск проходит 2 раз после предыдущего раза, он вызывает одну и ту же ошибку с закрытием обработчика.

tasks/events.py

import datetime

from src.conf import  celery
from src.utils.events import delete_event, get_events_from_db
from src.utils.decorators import async_task

@async_task(celery)
async def delete_overdue_events() -> None:
    """Ця функція видаляє прострочені події"""

    events = await get_events_from_db()

    for event in events:
        if event.get('end_time') < datetime.date.today():
            await delete_event(event.get('id'))

Ошибка возникает именно в await get_events_from_db(), но я использовал эту функцию в других местах и все хорошо работало.

utils/events.py

from typing import List, Dict

from sqlalchemy import select, delete

from src.database import async_session, redis_client
from src.database.models import Event

async def get_events_from_db() -> List[Dict]:
    """Ця функція отримує усі найближчі події з бази даних."""

    async with async_session() as session:
        query = select(Event)
        result = await session.execute(query)

    return [item.as_dict() for item in result.scalars().all()]

async def delete_event(event_id: int) -> None:
    """Ця функція відповідає за видалення події з бази даних.

    Параметри:
        event_id: ідентифікатор події;
    """

    async with async_session() as session:
        stat = delete(Event).where(Event.id == event_id)

        await session.execute(stat)

database/initialize.py

import aioredis

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

from src.conf.const import (
    DB_HOST, DB_NAME, DB_PASSWORD,
    DB_USER
)


DATABASE_URL = f'mysql+aiomysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'

engine = create_async_engine(url=DATABASE_URL, pool_recycle=3600)
async_session = async_sessionmaker(engine)

redis_client = aioredis.Redis()

conf/initialize.py

from celery import Celery
from celery.schedules import crontab

from src.conf.const import CELERY_RESULT_BACKEND, CELERY_BROKER_URL

celery = Celery(
    'tasks',
    broker=CELERY_BROKER_URL,
    backend=CELERY_RESULT_BACKEND,
    include=['src.tasks']
)

celery.conf.beat_schedule = {
    'check_schedules': {
        'task': 'src.tasks.schedule.check_schedule',
        'schedule': crontab(minute='*/35')
    },
    'check_overdue_events': {
        'task': 'src.tasks.events.delete_overdue_events',
        'schedule': crontab(minute='*/5')
    }
}

utils/decorator.py

from functools import wraps
from celery import Celery, Task
from typing import Any, Callable, Coroutine, ParamSpec, TypeVar
from asgiref import sync


_P = ParamSpec("_P")
_R = TypeVar("_R")


def async_task(app: Celery, *args: Any, **kwargs: Any):
    def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
        sync_call = sync.AsyncToSync(func)

        @app.task(*args, **kwargs)
        @wraps(func)
        def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
            return sync_call(*args, **kwargs)

        return _decorated

    return _decorator

Этот декоратор я позаимствовал y @Fran.

Traceback

Task src.tasks.events.delete_overdue_events[2313881d-bdd8-4e01-8cb4-692247e7979d] raised unexpected: RuntimeError('unable to perform operation on <TCPTransport closed=True reading=False 0x7417cc00b6a0>; the handler is closed')
Traceback (most recent call last):
  File "/home/oleksandr/develop/bot/src/utils/events.py", line 24, in get_events_from_db
    result = await session.execute(query)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
    return self._execute_internal(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2236, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
    result = conn.execute(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
    return meth(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2356, in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute
    cursor.execute(statement, parameters)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/aiomysql.py", line 95, in execute
    return self.await_(self._execute_async(operation, parameters))
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/aiomysql.py", line 104, in _execute_async
    result = await self._cursor.execute(operation, parameters)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/cursors.py", line 239, in execute
    await self._query(query)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/cursors.py", line 457, in _query
    await conn.query(q)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 468, in query
    await self._execute_command(COMMAND.COM_QUERY, sql)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 724, in _execute_command
    self._write_bytes(prelude + sql[:chunk_size - 1])
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 669, in _write_bytes
    return self._writer.write(data)
  File "/usr/lib/python3.10/asyncio/streams.py", line 325, in write
    self._transport.write(data)
  File "uvloop/handles/stream.pyx", line 674, in uvloop.loop.UVStream.write
  File "uvloop/handles/handle.pyx", line 159, in uvloop.loop.UVHandle._ensure_alive
RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x7417cc00b6a0>; the handler is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/celery/app/trace.py", line 453, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/celery/app/trace.py", line 736, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/oleksandr/develop/bot/src/utils/decorators.py", line 48, in _decorated
    return sync_call(*args, **kwargs)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/asgiref/sync.py", line 254, in __call__
    return call_result.result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/asgiref/sync.py", line 331, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/home/oleksandr/develop/bot/src/tasks/events.py", line 12, in delete_overdue_events
    events = await get_events_from_db()
  File "/home/oleksandr/develop/bot/src/utils/events.py", line 22, in get_events_from_db
    async with async_session() as session:
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 1025, in close
    await greenlet_spawn(self.sync_session.close)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2507, in close
    self._close_impl(invalidate=False)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2576, in _close_impl
    transaction.close(invalidate)
  File "<string>", line 2, in close
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1408, in close
    transaction.close()
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2586, in close
    self._do_close()
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2724, in _do_close
    self._close_impl()
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2710, in _close_impl
    self._connection_rollback_impl()
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2702, in _connection_rollback_impl
    self.connection._rollback_impl()
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1129, in _rollback_impl
    self._handle_dbapi_exception(e, None, None, None, None)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2356, in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1127, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 698, in do_rollback
    dbapi_connection.rollback()
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/aiomysql.py", line 201, in rollback
    self.await_(self._connection.rollback())
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 398, in rollback
    await self._execute_command(COMMAND.COM_QUERY, "ROLLBACK")
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 724, in _execute_command
    self._write_bytes(prelude + sql[:chunk_size - 1])
  File "/home/oleksandr/.cache/pypoetry/virtualenvs/bot--mjci2_C-py3.10/lib/python3.10/site-packages/aiomysql/connection.py", line 669, in _write_bytes
    return self._writer.write(data)
  File "/usr/lib/python3.10/asyncio/streams.py", line 325, in write
    self._transport.write(data)
  File "uvloop/handles/stream.pyx", line 674, in uvloop.loop.UVStream.write
  File "uvloop/handles/handle.pyx", line 159, in uvloop.loop.UVHandle._ensure_alive
RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x7417cc00b6a0>; the handler is closed

Я понимаю, что проблема где-то в async with async_session() as session:, но я не понимаю, как ее решить. Может, кто-то сталкивался с этой проблемой и может мне помочь?


Ответы (2 шт):

Автор решения: AnnaBazueva

Первое:

в функции delete_event Вы не вызываете await session.commit()
При выполнении операций INSERT , UPDATE , DELETE вам нужно вызвать commit ,
чтобы эти изменения были зафиксированы.
Это особенно критично при асинхронном коде!

Второе (надо проверять):

Вы используете sync.AsyncToSync для преобразования асинхронной функции в синхронную. Это может быть проблемой, если ваша функция выполняет асинхронные операции, так как она может блокировать поток.
А у Вас в функции delete_overdue_events() есть await (может обернуть в task?)!
Убедитесь, что ваш код не вызывает блокировок.
(Это может иметь место, т.к. Вы заимствовали декоратор!
Ему надо уделить больше внимания.)

Третье (Проблема архитектуры):

  • У Вас есть декоратор для асинхронной функции delete_overdue_events();
  • в которой есть две точки await (вызываются асинхронные функции)
  • в каждой из вызываемых функций:
    get_events_from_db(), delete_event(event_id: int)
    через контекстный менеджер открывается сессия БД
    и внутри присутствует точка await

При такой архитектуре трудно быть уверенным в том,
что все асинхронные операции будут завершены до закрытия сессии БД.

Попробуйте открывать сессию БД в декораторе, и передавать её как аргумент:

from functools import wraps
from celery import Celery, Task
from typing import Any, Callable, Coroutine, ParamSpec, TypeVar
from asgiref import sync
from sqlalchemy.ext.asyncio import async_sessionmaker

_P = ParamSpec("_P")
_R = TypeVar("_R")

# def async_task(app: Celery, session_factory: async_sessionmaker, *args: Any, **kwargs: Any):
#     def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
#         sync_call = sync.AsyncToSync(func)
# 
#         @app.task(*args, **kwargs)
#         @wraps(func)
#         async def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
#             async with session_factory() as session:
#                 # Передаем сессию в декорируемую функцию
#                 return await sync_call(*args, session=session, **kwargs)

#         return _decorated
#     return _decorator

def async_task(app: Celery, session_factory: async_sessionmaker, *args: Any, **kwargs: Any):
    def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
        @app.task(*args, **kwargs)
        @wraps(func)
        async def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
            async with session_factory() as session:
                # Передаем сессию в декорируемую функцию
                return await func(*args, session=session, **kwargs)

        return _decorated

    return _decorator

подстройте функцию delete_overdue_events:

@async_task(celery, async_session)
async def delete_overdue_events(session: AsyncSession) -> None:
    """Эта функция проверяет дату события и удаляет просроченные."""
    events = await get_events_from_db(session)
    for event in events:
        if event.get('end_time') < datetime.datetime.today():
            await delete_event(session, event.get('id'))

и немного измените функции get_events_from_db и delete_event

async def get_events_from_db(session: AsyncSession) -> List[Dict]:
    async with session.begin():  # Используйте begin для управления транзакцией
        query = select(Event)
        result = await session.execute(query)
    return [item.as_dict() for item in result.scalars().all()]

async def delete_event(session: AsyncSession, event_id: int) -> None:
    stat = delete(Event).where(Event.id == event_id)
    await session.execute(stat)

Обратите внимание, что в delete_event не требуется await session.commit(),
т.к. в get_events_from_db используется begin для управления транзакцией.


Дополнение.
Эта ошибка:

Task src.tasks.events.delete_overdue_events[86e1a76c-c6ed-4d9a-a7e7-d5882dbf03ea] raised unexpected: EncodeError(TypeError('Object of type coroutine is not JSON serializable')) 

возникает со стороны Celery, т.к. ему требуется сериализация объектов для передачи.
Было бы полезно подробнее описать архитектуру проекта, и всё, что касается работы с Celery.

→ Ссылка
Автор решения: all time

Если кому-то будет полезно, то проблема была в декораторе async_task, который я использовал для выполнения асинхронных функций внутри таска.

Для решения моей проблемы мне помогло обернуть мою таску в asyncio.get_event_loop().run_until_complete(f(*args, **kwargs)) внутри декоратора.

Декоратор:

def sync(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
    return wrapper

Моя асинхронная таска:

@shared_task
@sync
async def delete_overdue_events() -> None:
    """Эта функция проверяет дату события и удаляет просроченные."""

    events = await get_events_from_db()

    for event in events:
        if event.get('end_time') < datetime.datetime.today():
            await delete_event(event.get('id'))

Ответ я нашел не сам, а с помощью другого ответа на вопрос от @Sergey Kozlovskiy на Stackoverflow, который я нашел здесь.

Надеюсь, кому-то этот ответ поможет.

P.S. Спасибо @AnnaBazueva - SPAM за помощь в решении этой проблемы.

→ Ссылка