Как распределить вычисления функции для 50 датафреймов pandas по 8 ядрам процессора?
Последовательная реализация:
import pandas as pd
import numpy as np
np.random.seed(0)
a = {} # Словарь для датафреймов
def func(dataset,prm):
dataset['func'] = dataset.quantile(prm)
for i in range(50): # Создаю 50 случайных датафреймов
a[f'df{i}'] = pd.DataFrame(np.random.rand(3000, 3000))
Запуск:
for i in range(50):
func(a[f'df{i}'],0.2)
Получается вычислительно долго. Хочу использовать все ядра. Пыталась через from multiprocessing import Pool, получаю разные ошибки.
Неработающее через multiprocessing из последнего:
from multiprocessing import Pool
import os
with Pool(processes=8) as pool:
for i in range(50):
pool.apply_async(func, (a[f'df{i}'],0.2,))
[pool.apply_async(os.getpid, ()) for k in range(8)]
Ответы (1 шт):
Автор решения: CrazyElf
→ Ссылка
Я вам просто покажу, как делать правильно. Основное тут:
- не забывайте использовать
if __name__ == "__main__":(хотя это в основном нужно для мультипроцессинга, а у меня мультитрединг, но лучше перестраховаться) - ничего не присваивайте в отдельном потоке/процессе - либо
GILбудет блокировать исполнение других потоков, либо всё что вы присвоите останется в том же процессе, это дело гиблое - лучше вместо этого возвращайте результат в место запроса
- и не забывайте ожидать результат, вы просто запускаете процессы и бросаете их на произвол судьбы, а нужно получать результат и присваивать его
- но сразу этот результат не присваивайте обратно в данные - снова будет или
GILмешать илиrace conditionслучится, подождите пока все обработчики завершатся - если данных много, то лучше используйте многопоточность, а не многопроцессность - помните, что каждый процесс получает полную копию данных в памяти, т.е. не только переданный вами датасет, но и весь ваш словарь датасетов будет сериализован и передан в порождённый процесс! это займёт кучу ресурсов и времени и таким образом весь выигрыш от многопроцессности вы потеряете на этой сериализации
import pandas as pd
import numpy as np
from multiprocessing.pool import ThreadPool as Pool
#from multiprocessing import Pool
import os
from tqdm.auto import tqdm
import datetime
if __name__ == "__main__":
n = 50
start = datetime.datetime.now()
np.random.seed(0)
a = {} # Словарь для датафреймов
def func(dataset,prm):
return dataset.quantile(prm)
for i in tqdm(range(n)): # Создаю 50 случайных датафреймов
a[f'df{i}'] = pd.DataFrame(np.random.rand(3000, 3000))
#for i in tqdm(range(n)):
# func(a[f'df{i}'],0.2)
b = {}
with Pool(processes=8) as pool:
results = [pool.apply_async(func, (a[f'df{i}'],0.2,)) for i in range(n)]
for i, r in enumerate(tqdm(results)):
b[f'df{i}'] = r.get()
for i in tqdm(range(n)):
a[f'df{i}']['func'] = b[f'df{i}']
print(datetime.datetime.now() - start)