fastapi блокировка выполнения запросов при вызове асинхронной функции

в проекте присутствует роут (def start_ml), который содержит в себе функцию (main_task) обрабатывающий видеопоток. видеопоток обрабатывается покадрово (последовательно). по мере обработки видео происходит асинхронное обновление данных в БД (postgres). пример кода:

@result.get("/start_algorithm/{uid}",
            tags=["execution"], status_code=200, responses=response_404_video)
async def start_ml(uid: str,  background_tasks: BackgroundTasks):
    query = (
        select(
            [
                video_table.c.id,
                video_table.c.path
            ]
        ).select_from(video_table).where(video_table.c.uid == uid)
    )
    data = await database.fetch_one(query)
    if data is None:
        raise HTTPException(status_code=404, detail="video not found")
    res = await main_task(data.path, uid, data.id)
    return res
async def main_task(path_video: str, uid: str, id: int):
    data = {
        "uid": uid,
        "path": path_video,
          ...
        }
    cap = cv2.VideoCapture(path_video)
    q_loading = video_table.update().where(video_table.c.uid == uid).values(status="loading")
    await database.fetch_one(q_loading)
    totalFrames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    for _ in range(int(totalFrames)):
        _, cv2_frame = cap.read()
       ...

    q = video_table.update().where(video_table.c.uid == uid).values(status="success")
    await database.fetch_one(q)
    return data

при запросе на роут весь сервис API не отвечает до тех пор, пока не завершится обработка видео. ка решить данную проблему? мои попытки:

  1. background_tasks.add_task(main_task, data.path, uid, data.id)

  2. x = threading.Thread(target=asyncio.run, args=(data.path, uid, data.id, database,))

в 1) все равно происходит ожидание

во 2) ошибка при выполнении asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress


Ответы (1 шт):

Автор решения: Max Gacrux

я реши проблему через loop.run_in_executor(None, lambda: asyncio.run(main_task(data.path, uid, data.id))). в main_task сделал явный connect к database. run_in_executor - создает отдельный поток для задачи, ему не доступно соединение к бд

→ Ссылка