Как сделать многопроцессорность?
Функция принимает список чисел и для каждого делает список из чисел на которые оно делится без остатка. Я пытаюсь реализовать передачу нескольких чисел, но получаю очень много ошибок. Объясните как можно в моем случае реализовать многопроцессорность, что я делаю неправильно и нужно ли использовать RLock если я уже использую Manager.list()?
import time
from multiprocessing import cpu_count, Process, Manager
def factorize(factors: list, shared_value: list, *number):
for num in number:
for i in range(1, num+1):
if num % i == 0:
factors.append(i)
shared_value.append(factors)
return shared_value
if __name__ == '__main__':
cpu_cores = cpu_count()
start_time = time.time()
with Manager() as manager:
processes = []
shared_list = manager.list()
shared_factors = manager.list()
for _ in range(cpu_cores+1):
pr = Process(target=factorize, args=(shared_factors, shared_list, 128, 255, 99999, 10651060, ))
pr.start()
processes.append(pr)
[el.join() for el in processes]
end_time = time.time()
running_time = end_time - start_time
print(list(shared_list))
print('Time:', running_time)
assert shared_list[0] == [1, 2, 4, 8, 16, 32, 64, 128]
assert shared_list[1] == [1, 3, 5, 15, 17, 51, 85, 255]
assert shared_list[2] == [1, 3, 9, 41, 123, 271, 369, 813, 2439, 11111, 33333, 99999]
assert shared_list[3] == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
1065106, 1521580, 2130212, 2662765, 5325530, 10651060]
Ответы (3 шт):
1 ошибка - процессы запускаются с одинаковыми начальными условиями. Работает полезно один, остальные греют воздух.
2 ошибка - менеджер закрыт.
Если хочешь чистый код для обработки последовательностей и списков - используй multiprocessing.Pool().imap(), в этом случае с чанками
Внешний цикл оставь в главном процессе, перенос его воркера не даёт преимуществ.
Проблема в том, что вы передаете один и тот же список factors в качестве аргумента для каждого процесса. Это означает, что каждый процесс будет пытаться добавить элементы в один и тот же список, что приведет к ошибке доступа к разделяемой памяти.
Чтобы решить эту проблему, вы можете создать отдельный список factors для каждого процесса внутри функции factorize(). Это можно сделать, например, путем изменения функции factorize() следующим образом:
def factorize(shared_factors: list, shared_list: list, *number):
for num in number:
factors = []
for i in range(1, num+1):
if num % i == 0:
factors.append(i)
shared_factors.append(factors)
shared_list.extend(shared_factors)
return shared_list
Здесь мы создаем список factors внутри функции factorize() для каждого процесса, а затем добавляем его в общий список shared_factors. Затем мы добавляем все элементы из shared_factors в общий список shared_list с помощью метода extend(). Возвращаемым значением функции является общий список shared_list.
Чтобы ответить на ваш вопрос о RLock, если вы используете Manager.list(), то вам не нужно использовать RLock, потому что Manager.list() уже обеспечивает безопасный доступ к разделяемому списку между процессами. Однако, если вы вместо этого используете обычный список, то вам нужно будет использовать RLock для синхронизации доступа к списку между процессами.
Наконец, если вы хотите передавать несколько чисел, то вы можете просто передать их в виде аргументов после shared_list. Например, чтобы передать числа 128, 255, 99999 и 10651060, вы можете вызвать функцию factorize() следующим образом:
factorize(shared_factors, shared_list, 128, 255, 99999, 10651060)
Ошибка в том что вы обращаетесь к shared_list когда manager уже уничтожен:
if __name__ == '__main__':
...
with Manager() as manager:
...
shared_list = manager.list()
...
[el.join() for el in processes] # дочерние процессы ещё работают
# а manager уже уничтожен
...
print(list(shared_list)) # переменная зависит от нерабочего manager
...
Код можно исправить так:
if __name__ == '__main__':
with Manager() as manager:
cpu_cores = cpu_count()
# весь код должен быть внутри контекста manager
...
assert shared_list[3] == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
1065106, 1521580, 2130212, 2662765, 5325530, 10651060]
P.S. RLock не нужен. Ваш код вполне работоспособен без него.
P.P.S. Я говорил только про техническую ошибку. Код не разделяет работу между процессами - все делают одно и то же. Если поставить цель доработать ваш код, нужно ввести очередь из которой процессы будут выбирать задачи и решать их. Это не лучший способ, но рабочий:
import time
import queue
from multiprocessing import cpu_count, Process, Manager
def factorize(queue_, numbers, factorizations):
while True:
try:
k = queue_.get_nowait()
except queue.Empty:
break
num = numbers[k]
factors = factorizations[k]
for i in range(1, num + 1):
if num % i == 0:
factors.append(i)
if __name__ == '__main__':
with Manager() as manager:
start_time = time.time()
numbers = manager.list([128, 255, 99999, 10651060])
factorizations = manager.list([manager.list() for _ in numbers])
queue_ = manager.Queue()
for k in range(len(numbers)):
queue_.put(k)
processes = [
Process(target=factorize, args=(queue_, numbers, factorizations))
for _ in range(cpu_count() + 1)
]
for p in processes:
p.start()
for p in processes:
p.join()
end_time = time.time()
running_time = end_time - start_time
print(*map(list, factorizations))
print('Time:', running_time)
assert list(factorizations[0]) == [1, 2, 4, 8, 16, 32, 64, 128]
assert list(factorizations[1]) == [1, 3, 5, 15, 17, 51, 85, 255]
assert list(factorizations[2]) == [1, 3, 9, 41, 123, 271, 369, 813, 2439, 11111, 33333, 99999]
assert list(factorizations[3]) == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
1065106, 1521580, 2130212, 2662765, 5325530, 10651060]
P.P.P.S. Pool.map - готовое решение для вашей задачи. Он сам запустит процессы по числу процессоров, распределит числа между процессами, подождёт окончания вычислений, соберёт результаты в общий список:
import time
from multiprocessing import Pool
def factorize(num):
factors = []
for i in range(1, num + 1):
if num % i == 0:
factors.append(i)
return factors
if __name__ == '__main__':
start_time = time.time()
with Pool() as pool:
factorizations = pool.map(factorize, [128, 255, 99999, 10651060])
end_time = time.time()
running_time = end_time - start_time
print(*factorizations)
print('Time:', running_time)
assert factorizations[0] == [1, 2, 4, 8, 16, 32, 64, 128]
assert factorizations[1] == [1, 3, 5, 15, 17, 51, 85, 255]
assert factorizations[2] == [1, 3, 9, 41, 123, 271, 369, 813, 2439, 11111, 33333, 99999]
assert factorizations[3] == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
1065106, 1521580, 2130212, 2662765, 5325530, 10651060]