Python. Dispy library. How to run more tasks on a cluster than the number of processors?

I am writing a Python program to distribute calculations to several PCs (I am trying to speed up data processing). Selected the Dispy library. It turned out to run the test program on two PCs (see below).

Problem: When switching to a practical task with processing text data, the program cannot load the computer cores by 100% (only about 20%). The problem is related to the fact that the rtask() function runs no more than 24 tasks, 12 tasks per PC.

I tried to use the io_rtask() function (no limit on the number of tasks), but there was another problem. When using the io_rtask() function, only one random server is selected. That is, now you can run more tasks on a PC (server), but all tasks fall on 1 PC (server). In this case, the server is chosen randomly, the second one is ignored.

Question: How to run a given number of tasks regardless of the number of cores and on all PCs? For example, how to run 50 tasks on 24 cores so that they count simultaneously and do not stand in the calculation queue.

An example program (the original program is large, so I replaced the word processing with a simple loop with calculations):

# Run 'dispycosnode.py' program to start processes to execute computations sent
# by this client, along with this program.

# Distributed computing example where this client sends computation ('compute'
# function) to remote dispycos servers to run as remote tasks and obtain
# results. At any time at most one computation task is scheduled at a process,
# as the computation is supposed to be CPU heavy (although in this example they
# are not).


# this generator function is sent to remote dispycos servers to run tasks there
# test function with PC calculation
def compute(i, task=None):
    yield 0 # until I figured out how to get rid of the line, I need it to work
    import numpy as np
    import time
    for ii in range(50000):
        a = np.random.random([100 * 100])
        b = np.random.random([100 * 100])
        c = a*b
    return (i, 0, c[:10])



# -- code below is executed locally --

# client (local) task submits tasks
def client_proc(njobs, task=None):
    # schedule client with the scheduler; scheduler accepts one client
    # at a time, so if scheduler is shared, the client is queued until it
    # is done with already scheduled clients
    if (yield client.schedule()):
        raise Exception('Could not schedule client')

    # schedule tasks on dispycos servers
    rtasks = []
    for i in range(njobs):
        # computation is supposed to be CPU bound so 'rtask' is used so at most
        # one computations runs at a server at any time; for mostly idle
        # computations, 'io_rtask' may be used to run more than one task at a
        # server at the same time.
        
        # ---
        # POSSIBLE POINT OF ERROR
        #rtask = yield client.rtask(compute, i) # distributes tasks to all PCs, but no more than the number of cores
        rtask = yield client.io_rtask(compute, i) # distributes all tasks without restrictions on the number of cores, but for some reason for everything on one random server
        # ---
        
        if isinstance(rtask, pycos.Task):
            rtasks.append(rtask)
        else:
            print('  ** rtask failed for %s' % i)
    
    # wait for results
    for rtask in rtasks:
        result = yield rtask()
        if isinstance(result, tuple) and len(result) == 3:
            print('  +++  result for %d from %s: %s' % result)
        elif isinstance(result, pycos.MonitorStatus):
            print('    ** rtask %s failed: %s with %s' % (rtask, result.type, result.value))
        else:
            print('    ** rtask %s failed' % rtask)

    # close client
    yield client.close()


if __name__ == '__main__':
    import sys, random
    import pycos
    import pycos.netpycos
    from pycos.dispycos import *

    # package client fragments
    nodes = ['109.123.171.38', '109.123.171.39'] # '109.123.171.38', '109.123.171.39'
    client = Client([compute], nodes=nodes)
    # run 20 jobs
    pycos.Task(client_proc, 20)

Ответы (0 шт):