- ВКонтакте
- РћРТвЂВВВВВВВВнокласснРСвЂВВВВВВВВРєРСвЂВВВВВВВВ
- РњРѕР№ Р В Р’В Р РЋРЎв„ўР В Р’В Р РЋРІР‚ВВВВВВВВРЎР‚
- Viber
- Skype
- Telegram
Использование pathos.multiprocessing.ProcessPool для функции в которой используется его же экземпляр
Есть ли возможность использовать pool.apipe на функцию в которой находится другой вызов apipe того же пула? В обеих случаях я использую пул для того, чтобы была возможность завершить функции, не дожидаясь их завершения. Вот как я это себе представлял (минимальный воспроизводимый пример):
from pathos.multiprocessing import ProcessingPool as Pool
from sympy import sympify
import pyautogui as auto
import keyboard as k
class Worker:
def __init__(self):
self.pool = Pool(2)
def execute(self, code):
def execute_start(code):
def timeout_risk(val):
return sympify(val, rational=True)
def expr(val):
val = eval(f"f'{val}'")
task = self.pool.apipe(timeout_risk, val)
res = task.get(timeout=0.25)
return res
try:
exec(code)
return 200
except Exception as ex:
return ex.args[0] if ex.args else type(ex)
task = self.pool.apipe(execute_start, code)
return task.get()
def stop(self):
self.pool.terminate()
self.pool.restart()
def check(e):
if k.is_pressed('ctrl+q'):
executor.stop()
elif k.is_pressed('ctrl+1'):
res = executor.execute('while True:\n\tauto.moveTo(expr("10**2"), 0)')
print(res)
if __name__ == '__main__':
k.on_press(check)
executor = Worker()
k.wait()
Но при подобных попытках всё сводилось к daemonic processes are not allowed to have children
. Помогите разобраться с данной проблемой. Не обязательно использовать именно pathos, главное чтобы концепция работала и пул создавался именно при объявлении класса. Если кто-то знает способ реализации лучше - буду рад посмотреть
Ответы (1 шт):
Это сделано умышленно, дабы в случае ошибки не породилось бесконечное дерево процессов.
Но обойти ограничение возможно. Для этого нужно сделать два трюка:
- создать тип процессов, которые игнорируют
daemon
атрибут - перехватить создание нового процесса для пула.
import logging
from multiprocess.pool import Pool as PythonPool
import multiprocess.context
import pathos.multiprocessing as mp
DefaultProcessFactory = PythonPool.Process
_clazz_cache = {}
class NoDaemonMixin:
def _get_daemon(self):
logging.getLogger("nodaemon.class").debug("NoDaemon._get_daemon()")
return False
def _set_daemon(self, value):
logging.getLogger("nodaemon.class").debug("NoDaemon._set_daemon(%s) - ignored", value)
daemon = property(_get_daemon, _set_daemon)
if hasattr(multiprocess.context, "SpawnProcess"):
class NoDaemonSpawnProcess(NoDaemonMixin, multiprocess.context.SpawnProcess):
pass
_clazz_cache[multiprocess.context.SpawnProcess] = NoDaemonSpawnProcess
if hasattr(multiprocess.context, "ForkProcess"):
class NoDaemonForkProcess(NoDaemonMixin, multiprocess.context.ForkProcess):
pass
_clazz_cache[multiprocess.context.ForkProcess] = NoDaemonForkProcess
if hasattr(multiprocess.context, "ForkServerProcess"):
class NoDaemonForkServerProcess(NoDaemonMixin, multiprocess.context.ForkServerProcess):
pass
_clazz_cache[multiprocess.context.ForkServerProcess] = NoDaemonForkServerProcess
Этот код создаёт недемонические обёртки для классов, которые используются в стандартной библиотеке для разных стратегий порождения подпроцессов. Приходится проверять наличие классов, так как для разных ОС набор классов в multiprocess.context
различается.
Теперь как перехватить порождение подпроцессов.
Внутри pathos.multiprocessing
использует multiprocess.pool.Pool
, поэтому нужно перехватить вызов статического метода Pool.Process
который порождает новый процесс. Почему-то простой подмены контекста оказалось недостаточно - перестал работать маршалер параметров для pool.map
from multiprocess.pool import Pool as PythonPool
DefaultProcessFactory = PythonPool.Process
def _process(_, ctx, *args, **kwargs):
log = logging.getLogger("nodaemon.factory")
log.debug("MyPool.Process(): ctx=%s, args=%s, kwargs=%s", ctx, args, kwargs)
if ctx.Process in _clazz_cache:
wrap_clz = _clazz_cache[ctx.Process]
else:
raise TypeError(f"Unsupported process class: {ctx.Process}")
try:
log.debug("Creating process instance: process class %s", wrap_clz)
return wrap_clz(*args, **kwargs)
finally:
log.debug("MyPool.Process() done")
class MyProcessingPool(mp.ProcessingPool):
def _serve(self, nodes=None):
log = logging.getLogger("nodaemon.pool")
log.debug("MyProcessingPool._serve(nodes=%s)", nodes)
restore = PythonPool.Process is not _process
PythonPool.Process = _process
try:
return super(MyProcessingPool, self)._serve(nodes)
finally:
if restore:
log.debug("Restoring PythonPool.Process factory")
PythonPool.Process = DefaultProcessFactory
def restart(self, force = False):
log = logging.getLogger("nodaemon.pool")
log.debug("MyProcessingPool.restart(force=%s)", force)
restore = PythonPool.Process is not _process
PythonPool.Process = _process
try:
return super().restart(force=force)
finally:
if restore:
log.debug("Restoring PythonPool.Process factory")
PythonPool.Process = DefaultProcessFactory
Как оно работает: (полный пример)
def sub_worker(x):
print(f"sub_worker({x=})")
return x * 2
def worker(max_num):
print(f"worker({max_num=})")
sub_pool = MyProcessingPool()
try:
return sub_pool.map(sub_worker, range(max_num))
finally:
sub_pool.close()
sub_pool.join()
def main():
# logging.basicConfig(level=logging.DEBUG)
# logging.getLogger("nodaemon").setLevel(logging.INFO)
pool = MyProcessingPool()
result = pool.map(worker, [1, 2, 3, 4])
print(result)
pool.close()
pool.join()
вывод:
worker(max_num=1)
worker(max_num=2)
worker(max_num=3)
worker(max_num=4)
sub_worker(x=0)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=2)
sub_worker(x=3)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=2)
[[0], [0, 2], [0, 2, 4], [0, 2, 4, 6]]
Тем не менее, несмотря на возможность порождения недемонических процессов в пуле, этим следует пользоваться очень, очень осторожно.
Во-первых, перехват фабрики проссов - это очень хрупко. Что-нибудь обязательно пойдёт не так, если несколько видов пулов будут использоваться одновременно.
Во-вторых, есть риск при небрежном порождении деток заспамить всю систему до полнейших тормозов или даже отправить её в перезагрузку.