Не запускаются отложенные задачи taskiq
Я решил использовать taskiq для выполнения отложенных задач. Весь код ниже писал по документации и другим сайтам, причем вот такой код:
await self.service_broker_tasks.stop_active_test.kiq(test_active_id=test_active[0].id)
Отрабатывает без проблем, но та же самая таска, запущенная через определенное время не отрабатывает
await self.service_broker_tasks.stop_active_test.schedule_by_time(
await FSBackground().get_redis_source(),
deactivate_date,
test_active[0].id,
test_active_id = test_active[0].id)
Ниже основной код, в котором создается планировщик и так далее.
import os
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend, RedisScheduleSource
from Shared.BaseShared.Singleton import Singleton
from Shared.BaseShared.config import load_config
config = load_config()
class FSBackground(metaclass=Singleton):
REDIS_URL = config.redis.redis_url
AMQP_URL = config.rabbit.rabbit_url
def __init__(self):
self.result_backend = RedisAsyncResultBackend(self.REDIS_URL)
self.taskiq_broker = AioPikaBroker(self.AMQP_URL).with_result_backend(self.result_backend)
self.redis_source = RedisScheduleSource(
url=self.REDIS_URL,
prefix="schedule",
buffer_size=50,
max_connection_pool_size=100
)
self.scheduler = TaskiqScheduler(
broker=self.taskiq_broker,
sources=[self.redis_source],
)
def get_taskiq_broker(self):
return self.taskiq_broker
async def get_redis_source(self):
await self.redis_source.startup()
return self.redis_source
def get_scheduler(self):
return self.scheduler
def get_result_backend(self):
return self.result_backend
В этом классе содержатся все таски.
import logging
from fastapi import HTTPException, Depends
from Shared.BaseShared.FSBackground import FSBackground
from Services.TestActive.model import TestActive
from Shared.Session.session import AsyncDatabase
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BrokerTasks:
taskiq_broker = FSBackground().get_taskiq_broker()
result_tasks = FSBackground().get_result_backend()
@staticmethod
@taskiq_broker.task
async def stop_active_test(test_active_id: str, db = Depends(AsyncDatabase.get_session)):
logger.info(f"work?")
test_active = await db.get(TestActive, test_active_id)
if not test_active:
raise HTTPException(400, "test active not found")
elif test_active.active is False:
raise HTTPException(400, "test already not active")
else:
test_active.active = False
await db.commit()
Ну и функция в которой я вызываю(устанавливаю?) отложенную таску.
async def start_unactive(
self, test_id: str, teacher_id: str, test_time: int, in_spread: bool
):
test_active = await self.check_active_test(
test_id, teacher_id
)
if test_active:
if test_active[0].active is False:
deactivate_date = datetime.now() + timedelta(minutes=test_time)
test_active[0].active = True
test_active[0].in_spread = in_spread
test_active[0].deactivate_date = deactivate_date
await self.session.commit()
logger.info(f"Scheduling stop_active_test for test_active_id: {test_active[0].id}")
#await self.service_broker_tasks.stop_active_test.kiq(test_active_id=test_active[0].id)
print(deactivate_date)
await self.service_broker_tasks.stop_active_test.schedule_by_time(
await FSBackground().get_redis_source(),
deactivate_date,
test_active[0].id,
test_active_id = test_active[0].id)
logger.info(f"Scheduling stop_active_test for test_active_id: {test_active[0].id} end")
return 200