Как распределить вычисления функции для 50 датафреймов pandas по 8 ядрам процессора?

Последовательная реализация:

import pandas as pd
import numpy as np

np.random.seed(0)
a = {} # Словарь для датафреймов

def func(dataset,prm):
    dataset['func'] = dataset.quantile(prm)

for i in range(50): # Создаю 50 случайных датафреймов
    a[f'df{i}'] = pd.DataFrame(np.random.rand(3000, 3000))

Запуск:

for i in range(50):
    func(a[f'df{i}'],0.2)

Получается вычислительно долго. Хочу использовать все ядра. Пыталась через from multiprocessing import Pool, получаю разные ошибки.

Неработающее через multiprocessing из последнего:

from multiprocessing import Pool
import os

with Pool(processes=8) as pool:
    for i in range(50):
        pool.apply_async(func, (a[f'df{i}'],0.2,))
        [pool.apply_async(os.getpid, ()) for k in range(8)]

Ответы (1 шт):

Автор решения: CrazyElf

Я вам просто покажу, как делать правильно. Основное тут:

  • не забывайте использовать if __name__ == "__main__": (хотя это в основном нужно для мультипроцессинга, а у меня мультитрединг, но лучше перестраховаться)
  • ничего не присваивайте в отдельном потоке/процессе - либо GIL будет блокировать исполнение других потоков, либо всё что вы присвоите останется в том же процессе, это дело гиблое
  • лучше вместо этого возвращайте результат в место запроса
  • и не забывайте ожидать результат, вы просто запускаете процессы и бросаете их на произвол судьбы, а нужно получать результат и присваивать его
  • но сразу этот результат не присваивайте обратно в данные - снова будет или GIL мешать или race condition случится, подождите пока все обработчики завершатся
  • если данных много, то лучше используйте многопоточность, а не многопроцессность - помните, что каждый процесс получает полную копию данных в памяти, т.е. не только переданный вами датасет, но и весь ваш словарь датасетов будет сериализован и передан в порождённый процесс! это займёт кучу ресурсов и времени и таким образом весь выигрыш от многопроцессности вы потеряете на этой сериализации
import pandas as pd
import numpy as np
from multiprocessing.pool import ThreadPool as Pool
#from multiprocessing import Pool
import os
from tqdm.auto import tqdm
import datetime

if __name__ == "__main__":
    n = 50
    start = datetime.datetime.now()
    np.random.seed(0)
    a = {} # Словарь для датафреймов

    def func(dataset,prm):
        return dataset.quantile(prm)

    for i in tqdm(range(n)): # Создаю 50 случайных датафреймов
        a[f'df{i}'] = pd.DataFrame(np.random.rand(3000, 3000))

    #for i in tqdm(range(n)):
    #    func(a[f'df{i}'],0.2)

    b = {}
    with Pool(processes=8) as pool:
        results = [pool.apply_async(func, (a[f'df{i}'],0.2,)) for i in range(n)]
        for i, r in enumerate(tqdm(results)):
            b[f'df{i}'] = r.get()
    for i in tqdm(range(n)):
        a[f'df{i}']['func'] = b[f'df{i}']
    print(datetime.datetime.now() - start)
→ Ссылка