Как завершить дочерние процессы запущенные запросом в асинхронном коде?

Веб-сервер при получении get запроса запускает три процесса. Задача заключается в корректном завершении дочерних процессов при получении сигнала SIGTERM в главном процессе (ctrl+C, завершить процесс в котором работает сервер). Я попытался решить вопрос так, как приведено в документации aiohttp (https://docs.aiohttp.org/en/stable/web_advanced.html#aiohttp-web-graceful-shutdown) - способ не сработал корректно. Пытался навесить обработчик на цикл событий - тоже глухо. Примеры сервера и клиента привел ниже в упрощенном виде;

Сервер

import asyncio
import os
import signal

from aiohttp import web
from multiprocessing import Process
from time import sleep

def multiprocessing_task(k):
    print(f'Запущен процесс - {k}')
    sleep(50)


process_1 = Process(target=multiprocessing_task, args=(1,), name='process_1')
process_2 = Process(target=multiprocessing_task, args=(2,), name='process_2')
process_3 = Process(target=multiprocessing_task, args=(3,), name='process_3')
processes = [process_1, process_2, process_3]


def start_processes():
    start = perf_counter()
    for process in processes:
        process.daemon = True
        process.start()
        print(f'Process id - {process.pid=}')

    for process in processes:
        process.join()
    # Еще один из способов, который также не сработал
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGTERM, handle_SIGTERM)
    


def handle_SIGTERM():
    print('SIGTERM is COMING')


async def hello(request):
    start_processes()
    return web.Response(text="Hello my friend!!!")


async def shutdown(app):
    print('SHUTDOWN')
    for process in processes:
        if not process.is_alive():
            print('Process is not alive')
            return
        print(f'{process.pid=}')
        process.terminate()

app = web.Application()
app.add_routes([web.get('/', hello)])

# Второй рекомендуемый документацией способ решения
app.on_shutdown.append(shutdown)

web.run_app(app, port=8314)

Клиент

import aiohttp
import asyncio


async def request():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://0.0.0.0:8314/') as resp:
            print(await resp.text())


asyncio.run(request())

Как я только не пытался решить эту задачу, корректно не получилось. Прошу дать совет куда рыть.


Ответы (1 шт):

Автор решения: eri
pool = concurrent.futures.ProcessPoolExecutor()

def start_processes():
    loop = asyncio.get_running_loop()
    pcs = []
    for args in [(1,),(2,),(3,)]:
        pcs.append(
            loop.run_in_executor(pool, multiprocessing_task, *args)
        )
    return pcs

async def hello(request):
    if not app.process_task:
        app.process_tasks = start_processes()
    return web.Response(text="Hello my friend!!!")

async def on_shutdown(app):
    for task in app.process_tasks:
        task.cancel()
    
app.process_tasks = []

app.on_shutdown.append(on_shutdown)

Я бы не стал вмешивать мультипроцессинг в асинкио, а сделал примерно так.

→ Ссылка