Как завершить дочерние процессы запущенные запросом в асинхронном коде?
Веб-сервер при получении get запроса запускает три процесса. Задача заключается в корректном завершении дочерних процессов при получении сигнала SIGTERM в главном процессе (ctrl+C, завершить процесс в котором работает сервер). Я попытался решить вопрос так, как приведено в документации aiohttp (https://docs.aiohttp.org/en/stable/web_advanced.html#aiohttp-web-graceful-shutdown) - способ не сработал корректно. Пытался навесить обработчик на цикл событий - тоже глухо. Примеры сервера и клиента привел ниже в упрощенном виде;
Сервер
import asyncio
import os
import signal
from aiohttp import web
from multiprocessing import Process
from time import sleep
def multiprocessing_task(k):
print(f'Запущен процесс - {k}')
sleep(50)
process_1 = Process(target=multiprocessing_task, args=(1,), name='process_1')
process_2 = Process(target=multiprocessing_task, args=(2,), name='process_2')
process_3 = Process(target=multiprocessing_task, args=(3,), name='process_3')
processes = [process_1, process_2, process_3]
def start_processes():
start = perf_counter()
for process in processes:
process.daemon = True
process.start()
print(f'Process id - {process.pid=}')
for process in processes:
process.join()
# Еще один из способов, который также не сработал
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, handle_SIGTERM)
def handle_SIGTERM():
print('SIGTERM is COMING')
async def hello(request):
start_processes()
return web.Response(text="Hello my friend!!!")
async def shutdown(app):
print('SHUTDOWN')
for process in processes:
if not process.is_alive():
print('Process is not alive')
return
print(f'{process.pid=}')
process.terminate()
app = web.Application()
app.add_routes([web.get('/', hello)])
# Второй рекомендуемый документацией способ решения
app.on_shutdown.append(shutdown)
web.run_app(app, port=8314)
Клиент
import aiohttp
import asyncio
async def request():
async with aiohttp.ClientSession() as session:
async with session.get('http://0.0.0.0:8314/') as resp:
print(await resp.text())
asyncio.run(request())
Как я только не пытался решить эту задачу, корректно не получилось. Прошу дать совет куда рыть.
Ответы (1 шт):
pool = concurrent.futures.ProcessPoolExecutor()
def start_processes():
loop = asyncio.get_running_loop()
pcs = []
for args in [(1,),(2,),(3,)]:
pcs.append(
loop.run_in_executor(pool, multiprocessing_task, *args)
)
return pcs
async def hello(request):
if not app.process_task:
app.process_tasks = start_processes()
return web.Response(text="Hello my friend!!!")
async def on_shutdown(app):
for task in app.process_tasks:
task.cancel()
app.process_tasks = []
app.on_shutdown.append(on_shutdown)
Я бы не стал вмешивать мультипроцессинг в асинкио, а сделал примерно так.