Event loop is closed

#celery_scheduler.py
from celery import Celery
from datetime import timedelta
from dotenv import load_dotenv
import os
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "app", ".env"))


scheduler = Celery('tg_messages', broker='redis://localhost:6379/0')

scheduler.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

scheduler.conf.beat_schedule = {
    'send_newsletter': {
        'task': 'celery_tasks.send_newsletter_task',
        'schedule': timedelta(seconds=7),
        'args': (3, ),
    }
}
#celery_tasks
from httpx import AsyncClient
import celery
import asyncio
from aiogram import Bot

from celery_scheduler import scheduler
from app.database.models import Cities
from app.middleware.middleware import logger
from app.repositories.models import QueryFilter
from app.services.db_services import UserService
from app.telegram_bot.bot import send_newsletter_message
from app.utils.uow import Uow
from app.core.settings import settings

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

class LoggedTask(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f"Schedule task - {task_id} failed: {exc}")

@scheduler.task(base=LoggedTask)
def send_newsletter_task(timezone):
    loop.run_until_complete(send_newsletter(timezone))

async def send_newsletter(timezone):
    async with Bot(settings.TELEGRAM_API) as bot:
        city_ids = await UserService(Uow()).user_cities_by_timezone(Cities, 'city_id', 'id', (0, "city_id"),
                                                                    QueryFilter(column='timezone', value=timezone))
        for city_id in city_ids:
            users_in_city: list[int] = await UserService(Uow()).select_users({'city_id': city_id, 'newsletter': True},
                                                                             return_value='telegram_id')
            async with AsyncClient() as client:
                body = {'city_id': city_id, 'forecast_range': 'Прогноз на сегодня'}
                forecast = await client.post('http://127.0.0.1:80/user/get_forecast', data=body)
                forecast = forecast.json().get('forecast')
                if not forecast:
                    logger.error("Failed to get forecast")
            for user in users_in_city:
                try:
                    await send_newsletter_message(user, forecast, tg_bot=bot)
                except Exception as e:
                    logger.error(f"Error sending forecast: {e}\n")

Запускаю так: Файл main.py

celery -A celery_tasks worker --pool=solo
celery -A celery_tasks beat

Всё работает до тех пор, пока я не добавлю в файл celery_scheduler строки

#celery_scheduler.py
from app.database.models import Cities
from app.services.db_services import CitiesService, UserService
from app.utils.uow import Uow
from celery import Celery
from datetime import timedelta
from dotenv import load_dotenv
import os
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "app", ".env"))
loop=asyncio.new_event_loop()
loop.run_until_complete(CitiesService(Uow().get_unique('timezone')

После этого выходит ошибка что закрыт цикл событий, хотя вроде я же создаю отдельный и выполняю асинхронную функцию в нем, но видимо нет, меня уже больше интересует не решение проблемы, а почему так происходит, но и как её решить тоже Код полного проекта: https://github.com/DownKAX/weather1/tree/celery_postgre


Ответы (1 шт):

Автор решения: eri

Убери всё связанное с loop.

loop.run_until_complete замени на asyncio.run, если версия позволяет. Этот метод сам создаст луп.

→ Ссылка