Как записать результаты выполнения функций в потоках в массив?
Нашел код который демонстрирует работу многопоточности в python 3. Я хочу сделать чтобы в каждом потоке выполнилась моя функция и после завершения работы всех потоков я получил массив с результатами всех потоков
import queue
import threading
import time
# The queue for tasks
q = queue.Queue()
# Worker, handles each task
def worker():
while True:
item = q.get()
if item is None:
break
#здесь должна выполниться моя функция я записать результат в массив
result = subprocess.run(['command', 'arg1', 'arg2'], stdout=subprocess.PIPE)
q.task_done()
def start_workers(worker_pool=1000):
threads = []
for i in range(worker_pool):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
return threads
def stop_workers(threads):
# stop workers
for i in threads:
q.put(None)
for t in threads:
t.join()
def create_queue(task_items):
for item in task_items:
q.put(item)
if __name__ == "__main__":
# Dummy tasks
tasks = [item for item in range(10)]
# Start up your workers
workers = start_workers(worker_pool=10)
create_queue(tasks)
# Blocks until all tasks are complete
result = q.join()
print(result)
result2 = worker.join()
print(result2)
stop_workers(workers)
Ответы (3 шт):
Проще всего наверное сделать ещё одну очередь для хранения результатов (очереди потокобезопасны):
q = queue.Queue()
# Для хранения результатов
result = queue.Queue()
def worker():
...
# Помещаем результат в очередь
result.put(subprocess.run(['command', 'arg1', 'arg2'], stdout=subprocess.PIPE))
...
stop_workers(workers)
# Разбираем результаты
for item in result.queue:
print(item)
multiprocessing.dummy - интерфейс к Thread, повторяющий api multiprocessing. Функция map очень сильно упрощают обработку массивов.
from multiprocessing.dummy import Pool
pool = Pool(1000)
def runner(item):
...
result = ...
return result
if __name__ == "__main__":
results = pool.map(runner, range(10))
Очереди, ожидания, запуск воркеров уже реализованы.
Часто, удобно не ждать, пока абсолютно все задачи закончат выполнятся в пуле pool.map, и только потом обработать все результаты скопом, а начинать обрабатывать результат каждой из задач по отдельности, сразу по окончании ее выполнения, параллельно продолжающейся работе в пуле. pool.imap_unordered будет возвращать результаты не в том порядке, в котором мы добавляли задачи, а какая из задач быстрее закончит выполнятся, результат той первее и вернется. Таким образом можно приделать прогресбар, или ускорить обработку результатов.
Например ниже, в пуле потоков будет отослано 10 запросов в гугл, ответы на запросы будут сохранены в файлы. В выводе скрипта видно, что сохранение файлов(обработка результатов работы пула) происходит параллельно работе пула запросов.
import multiprocessing.pool, time, random, urllib.request
Headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}
def runner(word):
print(f"\n-> {time.strftime('%H:%M:%S', time.gmtime())} search {word}")
time.sleep(random.randint(1, 5))
with urllib.request.urlopen(urllib.request.Request(f'https://www.google.com/search?q={word}', headers=Headers)) as http_body:
return word, http_body.read()
if __name__ == '__main__':
[*search_words] = range(10)
with multiprocessing.pool.ThreadPool(processes=4) as pool:
for word, http_body in pool.imap_unordered(runner, search_words):
print(f"\n<- {time.strftime('%H:%M:%S', time.gmtime())} save {word}, len={len(http_body)}")
with open(f'{word}_http_body.html', 'wb') as f:
f.write(http_body)
out:
-> 20:55:38 search 0
-> 20:55:38 search 1
-> 20:55:38 search 2
-> 20:55:38 search 3
-> 20:55:41 search 4
<- 20:55:41 save 0, len=82176
-> 20:55:42 search 5
<- 20:55:42 save 2, len=78236
-> 20:55:43 search 6
<- 20:55:43 save 1, len=117082
-> 20:55:43 search 7
<- 20:55:43 save 3, len=147944
-> 20:55:44 search 8
<- 20:55:44 save 4, len=83972
-> 20:55:45 search 9
<- 20:55:45 save 5, len=72776
<- 20:55:46 save 8, len=138741
<- 20:55:48 save 6, len=83061
<- 20:55:49 save 7, len=114936
<- 20:55:49 save 9, len=191621