Использование pathos.multiprocessing.ProcessPool для функции в которой используется его же экземпляр

Есть ли возможность использовать pool.apipe на функцию в которой находится другой вызов apipe того же пула? В обеих случаях я использую пул для того, чтобы была возможность завершить функции, не дожидаясь их завершения. Вот как я это себе представлял (минимальный воспроизводимый пример):

from pathos.multiprocessing import ProcessingPool as Pool
from sympy import sympify
import pyautogui as auto
import keyboard as k

class Worker:
    def __init__(self):
        self.pool = Pool(2)
    def execute(self, code):
        def execute_start(code):
            def timeout_risk(val):
                return sympify(val, rational=True)
            def expr(val):
                val = eval(f"f'{val}'")
                task = self.pool.apipe(timeout_risk, val)
                res = task.get(timeout=0.25)
                return res
            try:
                exec(code)
                return 200
            except Exception as ex:
                return ex.args[0] if ex.args else type(ex)
        task = self.pool.apipe(execute_start, code)
        return task.get()
    def stop(self):
        self.pool.terminate()
        self.pool.restart()

def check(e):
    if k.is_pressed('ctrl+q'):
        executor.stop()
    elif k.is_pressed('ctrl+1'):
        res = executor.execute('while True:\n\tauto.moveTo(expr("10**2"), 0)')
        print(res)

if __name__ == '__main__':
    k.on_press(check)
    executor = Worker()
    k.wait()

Но при подобных попытках всё сводилось к daemonic processes are not allowed to have children. Помогите разобраться с данной проблемой. Не обязательно использовать именно pathos, главное чтобы концепция работала и пул создавался именно при объявлении класса. Если кто-то знает способ реализации лучше - буду рад посмотреть


Ответы (1 шт):

Автор решения: Pak Uula

Это сделано умышленно, дабы в случае ошибки не породилось бесконечное дерево процессов.

Но обойти ограничение возможно. Для этого нужно сделать два трюка:

  • создать тип процессов, которые игнорируют daemon атрибут
  • перехватить создание нового процесса для пула.
import logging

from multiprocess.pool import Pool as PythonPool
import multiprocess.context

import pathos.multiprocessing as mp

DefaultProcessFactory = PythonPool.Process

_clazz_cache = {}

class NoDaemonMixin:
    def _get_daemon(self):
        logging.getLogger("nodaemon.class").debug("NoDaemon._get_daemon()")
        return False
    def _set_daemon(self, value):
        logging.getLogger("nodaemon.class").debug("NoDaemon._set_daemon(%s) - ignored", value)
    daemon = property(_get_daemon, _set_daemon)

if hasattr(multiprocess.context, "SpawnProcess"):
    class NoDaemonSpawnProcess(NoDaemonMixin, multiprocess.context.SpawnProcess):
        pass
    _clazz_cache[multiprocess.context.SpawnProcess] = NoDaemonSpawnProcess

if hasattr(multiprocess.context, "ForkProcess"):
    class NoDaemonForkProcess(NoDaemonMixin, multiprocess.context.ForkProcess):
        pass
    _clazz_cache[multiprocess.context.ForkProcess] = NoDaemonForkProcess

if hasattr(multiprocess.context, "ForkServerProcess"):
    class NoDaemonForkServerProcess(NoDaemonMixin, multiprocess.context.ForkServerProcess):
        pass
    _clazz_cache[multiprocess.context.ForkServerProcess] = NoDaemonForkServerProcess

Этот код создаёт недемонические обёртки для классов, которые используются в стандартной библиотеке для разных стратегий порождения подпроцессов. Приходится проверять наличие классов, так как для разных ОС набор классов в multiprocess.context различается.

Теперь как перехватить порождение подпроцессов.

Внутри pathos.multiprocessing использует multiprocess.pool.Pool, поэтому нужно перехватить вызов статического метода Pool.Process который порождает новый процесс. Почему-то простой подмены контекста оказалось недостаточно - перестал работать маршалер параметров для pool.map

from multiprocess.pool import Pool as PythonPool

DefaultProcessFactory = PythonPool.Process


def _process(_, ctx, *args, **kwargs):
    log = logging.getLogger("nodaemon.factory")
    log.debug("MyPool.Process(): ctx=%s, args=%s, kwargs=%s", ctx, args, kwargs)
    if ctx.Process in _clazz_cache:
        wrap_clz = _clazz_cache[ctx.Process]
    else:
        raise TypeError(f"Unsupported process class: {ctx.Process}")
            
    try:
        log.debug("Creating process instance: process class %s", wrap_clz)
        return wrap_clz(*args, **kwargs)
    finally:
        log.debug("MyPool.Process() done")


class MyProcessingPool(mp.ProcessingPool):
    def _serve(self, nodes=None):
        log = logging.getLogger("nodaemon.pool")
        
        log.debug("MyProcessingPool._serve(nodes=%s)", nodes)

        restore = PythonPool.Process is not _process
        PythonPool.Process = _process
        try:
            return super(MyProcessingPool, self)._serve(nodes)
        finally:
            if restore:
                log.debug("Restoring PythonPool.Process factory")
                PythonPool.Process = DefaultProcessFactory

    def restart(self, force = False):
        log = logging.getLogger("nodaemon.pool")

        log.debug("MyProcessingPool.restart(force=%s)", force)

        restore = PythonPool.Process is not _process
        PythonPool.Process = _process
        try:
            return super().restart(force=force)
        finally:
            if restore:
                log.debug("Restoring PythonPool.Process factory")
                PythonPool.Process = DefaultProcessFactory

Как оно работает: (полный пример)

def sub_worker(x):
    print(f"sub_worker({x=})")
    return x * 2

def worker(max_num):
    print(f"worker({max_num=})")
    
    sub_pool = MyProcessingPool()
    try:
        return sub_pool.map(sub_worker, range(max_num))
    finally:
        sub_pool.close()
        sub_pool.join()
        
def main():
    # logging.basicConfig(level=logging.DEBUG)
    # logging.getLogger("nodaemon").setLevel(logging.INFO)
    pool = MyProcessingPool()
    result = pool.map(worker, [1, 2, 3, 4])
    print(result)
    pool.close()
    pool.join()

вывод:

worker(max_num=1)
worker(max_num=2)
worker(max_num=3)
worker(max_num=4)
sub_worker(x=0)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=2)
sub_worker(x=3)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=2)
[[0], [0, 2], [0, 2, 4], [0, 2, 4, 6]]

Тем не менее, несмотря на возможность порождения недемонических процессов в пуле, этим следует пользоваться очень, очень осторожно.

Во-первых, перехват фабрики проссов - это очень хрупко. Что-нибудь обязательно пойдёт не так, если несколько видов пулов будут использоваться одновременно.

Во-вторых, есть риск при небрежном порождении деток заспамить всю систему до полнейших тормозов или даже отправить её в перезагрузку.

→ Ссылка