Python. Библиотека Dispy. Как запустить большее количество задач на кластере, чем количество процессоров?
Пишу программу на языке Python для распределения вычислений на несколько ПК (пытаюсь ускорить обработку данных). Выбрал библиотеку Dispy. Вышло запустить тестовую программу на двух ПК (см. ниже).
Проблема:
При переходе на практическую задачу с предварительной обработкой текстовых данных, программа не может нагрузить ядра компьютеров на 100% (только около 20%). Проблема связана с тем, что функция rtask() запускает не более 24 задач, по 12 задач на ПК (соответствует количеству виртуальных ядер).
Пытался использовать функцию io_rtask() (без ограничения на количество задач), но возникла другая проблема. При использовании функции io_rtask() выбирается только один случайный сервер. То есть теперь можно запустить больше задач на ПК (сервере), но все задачи попадают на 1 ПК (сервер). При этом сервер выбирается случайно, второй игнорируется.
Вопрос: Как запустить заданное количество задач не зависимо от количества ядер и на всех ПК? Например, как запустить 50 задач на 24 ядрах, чтобы они считали одновременно и не стояли в очереди вычислений.
Пример программы (исходная программа большая, поэтому обработку текстов заменил на простой цикл с вычислениями):
# Run 'dispycosnode.py' program to start processes to execute computations sent
# by this client, along with this program.
# Distributed computing example where this client sends computation ('compute'
# function) to remote dispycos servers to run as remote tasks and obtain
# results. At any time at most one computation task is scheduled at a process,
# as the computation is supposed to be CPU heavy (although in this example they
# are not).
# this generator function is sent to remote dispycos servers to run tasks there
# тестовая функция с вычислениями на ПК
def compute(i, task=None):
yield 0 # пока не разобрался как избавиться от строки, нужна, чтобы работало
import numpy as np
import time
for ii in range(50000):
a = np.random.random([100 * 100])
b = np.random.random([100 * 100])
c = a*b
return (i, 0, c[:10])
# -- code below is executed locally --
# client (local) task submits tasks
def client_proc(njobs, task=None):
# schedule client with the scheduler; scheduler accepts one client
# at a time, so if scheduler is shared, the client is queued until it
# is done with already scheduled clients
if (yield client.schedule()):
raise Exception('Could not schedule client')
# schedule tasks on dispycos servers
rtasks = []
for i in range(njobs):
# computation is supposed to be CPU bound so 'rtask' is used so at most
# one computations runs at a server at any time; for mostly idle
# computations, 'io_rtask' may be used to run more than one task at a
# server at the same time.
# ---
# ВОЗМОЖНОЕ МЕСТО ОШИБКИ
#rtask = yield client.rtask(compute, i) # распределяет задачи на все ПК, но не более чем количество ядер
rtask = yield client.io_rtask(compute, i) # распределяет все задачи независимо от количества ядер, но почему-то на случайный сервер (остальные игнорирует)
# ---
if isinstance(rtask, pycos.Task):
rtasks.append(rtask)
else:
print(' ** rtask failed for %s' % i)
# wait for results
for rtask in rtasks:
result = yield rtask()
if isinstance(result, tuple) and len(result) == 3:
print(' +++ result for %d from %s: %s' % result)
elif isinstance(result, pycos.MonitorStatus):
print(' ** rtask %s failed: %s with %s' % (rtask, result.type, result.value))
else:
print(' ** rtask %s failed' % rtask)
# close client
yield client.close()
if __name__ == '__main__':
import sys, random
import pycos
import pycos.netpycos
from pycos.dispycos import *
# package client fragments
nodes = ['109.123.171.38', '109.123.171.39'] # '109.123.171.38', '109.123.171.39'
client = Client([compute], nodes=nodes)
# run 20 jobs
pycos.Task(client_proc, 20)