Event loop is closed
#celery_scheduler.py
from celery import Celery
from datetime import timedelta
from dotenv import load_dotenv
import os
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "app", ".env"))
scheduler = Celery('tg_messages', broker='redis://localhost:6379/0')
scheduler.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
scheduler.conf.beat_schedule = {
'send_newsletter': {
'task': 'celery_tasks.send_newsletter_task',
'schedule': timedelta(seconds=7),
'args': (3, ),
}
}
#celery_tasks
from httpx import AsyncClient
import celery
import asyncio
from aiogram import Bot
from celery_scheduler import scheduler
from app.database.models import Cities
from app.middleware.middleware import logger
from app.repositories.models import QueryFilter
from app.services.db_services import UserService
from app.telegram_bot.bot import send_newsletter_message
from app.utils.uow import Uow
from app.core.settings import settings
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class LoggedTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(f"Schedule task - {task_id} failed: {exc}")
@scheduler.task(base=LoggedTask)
def send_newsletter_task(timezone):
loop.run_until_complete(send_newsletter(timezone))
async def send_newsletter(timezone):
async with Bot(settings.TELEGRAM_API) as bot:
city_ids = await UserService(Uow()).user_cities_by_timezone(Cities, 'city_id', 'id', (0, "city_id"),
QueryFilter(column='timezone', value=timezone))
for city_id in city_ids:
users_in_city: list[int] = await UserService(Uow()).select_users({'city_id': city_id, 'newsletter': True},
return_value='telegram_id')
async with AsyncClient() as client:
body = {'city_id': city_id, 'forecast_range': 'Прогноз на сегодня'}
forecast = await client.post('http://127.0.0.1:80/user/get_forecast', data=body)
forecast = forecast.json().get('forecast')
if not forecast:
logger.error("Failed to get forecast")
for user in users_in_city:
try:
await send_newsletter_message(user, forecast, tg_bot=bot)
except Exception as e:
logger.error(f"Error sending forecast: {e}\n")
Запускаю так: Файл main.py
celery -A celery_tasks worker --pool=solo
celery -A celery_tasks beat
Всё работает до тех пор, пока я не добавлю в файл celery_scheduler строки
#celery_scheduler.py
from app.database.models import Cities
from app.services.db_services import CitiesService, UserService
from app.utils.uow import Uow
from celery import Celery
from datetime import timedelta
from dotenv import load_dotenv
import os
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "app", ".env"))
loop=asyncio.new_event_loop()
loop.run_until_complete(CitiesService(Uow().get_unique('timezone')
После этого выходит ошибка что закрыт цикл событий, хотя вроде я же создаю отдельный и выполняю асинхронную функцию в нем, но видимо нет, меня уже больше интересует не решение проблемы, а почему так происходит, но и как её решить тоже Код полного проекта: https://github.com/DownKAX/weather1/tree/celery_postgre
Ответы (1 шт):
Автор решения: eri
→ Ссылка
Убери всё связанное с loop
.
loop.run_until_complete
замени на asyncio.run
, если версия позволяет. Этот метод сам создаст луп.