Асинхронное получение статуса задачи Celery

Необходимо асинхронно получить статус задачи Celery. Попытался сделать это через run_in_executor, но поскольку для получения статуса используется property класса AsyncResult, то обернул вызов этого property в дополнительный метод get_state_async_not_work. В результате статус задачи всегда возвращается как PENDING.

Но, если напрямую вызывать приватные методы класса AsyncResult, то статус задачи возвращается правильный.

В чем может быть проблема? Как получить статус с помощью get_state_async_not_work?

server.py

from celery import Celery

app = Celery(broker='amqp://admin:admin@localhost:5672/default', backend='rpc')


@app.task(name='test')
def test():
    print('Test')


if __name__ == '__main__':
    app.worker_main(['worker'])

client.py

from asyncio import run, get_running_loop
from time import sleep

from server import test, app

task_id = test.delay().task_id
sleep(1)


def get_state(_task_id: str):
    print(app.AsyncResult(id=_task_id).state)


async def get_state_async(_task_id: str):
    loop = get_running_loop()
    res = await loop.run_in_executor(None, app.AsyncResult(id=_task_id)._get_task_meta)
    print(res['status'])


async def get_state_async_not_work(_task_id: str):
    loop = get_running_loop()
    res = await loop.run_in_executor(None, get_state, _task_id)


get_state(task_id)
run(get_state_async(task_id))
run(get_state_async_not_work(task_id))

Ответы (0 шт):