Многопоточность с multiprocessing + json
Программа используется для огромного кол-ва итераций. Для этого использовал библиотеку multiprocessing. Создал несколько параллельных потоков для большей эффективности, но при записи данных в json файл и их обработки, ломается сам json-файл, в которые записываются данные. Из-за того, что файл открыт сразу в нескольких потоках, он и сохраняется и сразу изменяется и т.д. Получается так, что в одном потоке json-файл открыт, а в другом уже изменился и сохранился. Появляются лишние [] и {} скобки, из-за чего файл становится не читабельный и все это приводит к ошибке: json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0). Пытался использовать multiprocessing.Lock(), но все равно не помогает, файл все также ломается. Вот код запуска программы:
def process_item_wrapper(args):
process_item(*args)
def main():
start_time = time.time()
items_list = read_json('winfo.json')
collections_ = read_json('Collections.json')
name_to_item = {item['name']: item for item in items_list}
num_cores = 8 # Установите количество доступных ядер процессора
with multiprocessing.Pool(processes=num_cores) as pool:
list(tqdm(pool.imap_unordered(process_item_wrapper, [(item, collections_, name_to_item, items_list) for item in items_list]), total=len(items_list)))
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")
if __name__ == '__main__':
main()
Проблемная часть кода:
lock = multiprocessing.Lock()
def check_on_max_percentage_change(percentage, info_dict):
with lock:
profitable_data = read_json('profitable.json')
if not profitable_data[0]['max-profitable']:
profitable_data[0]['max-profitable'].append({'percentage': percentage, 'data': info_dict})
else:
for i in range(len(profitable_data[0]['max-profitable'])):
if profitable_data[0]['max-profitable'][i]['percentage'] < percentage:
profitable_data[0]['max-profitable'].insert(i, {'percentage': percentage, 'data': info_dict})
break
else:
profitable_data[0]['max-profitable'].append({'percentage': percentage, 'data': info_dict})
break
if len(profitable_data[0]['max-profitable']) > 10:
del profitable_data[0]['max-profitable'][10:]
write_json('profitable.json', profitable_data)
def get_most_profitable_contracts(info_dict):
cost_craft = info_dict["item"]["price"] * 1 + info_dict["additional_item"]["price"] * 9
profits = []
chances = []
percentage_arr = []
for item in range(len(info_dict['dropped_items'])):
percentage = round(calculate_percentage_change(cost_craft, info_dict["dropped_items"][item]["price"]), 2)
percentage_arr.append(percentage)
profits.append(percentage)
chances.append(round(info_dict["dropped_items"][item]["chance"]))
check_on_max_percentage_change(max(percentage_arr), info_dict)
weights = [chance / 100 for chance in chances]
weighted_average = sum(p * w for p, w in zip(profits, weights)) / sum(weights)
info_dict['profitable'] = weighted_average
return info_dict
Ответы (2 шт):
Как минимум, вы неправильно работаете с локом. Вот пример из документации:
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Тут очень важно, что lock объект инстанциируется один раз и передаётся аргументом во все процессы. У вас же, если вы предоставили релевантный код, на каждый процесс создаётся свой отдельный lock и, конечно, он в этом случае ни от чего не защищает. lock конкретного процесса ничего не знает об остальных lock других процессов.
Вот изменённый и рабочий вариант кода:
def main():
start_time = time.time()
items_list = read_json('winfo.json')
collections_ = read_json('Collections.json')
name_to_item = {item['name']: item for item in items_list}
manager = multiprocessing.Manager()
lock = manager.RLock()
num_cores = 8 # Установите количество доступных ядер процессора
with multiprocessing.Pool(processes=num_cores) as pool:
list(tqdm(pool.imap_unordered(process_item_wrapper, [(item, collections_, name_to_item, items_list, lock) for item in items_list]), total=len(items_list)))
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")
Добавил в main создание блокировщика, и передаю его через аргументы функций до check_on_max_percentage_change(). Учту, что просто multiprocessing.RLock() не будет работать, т.к. RLock() не может быть общим процессом. Для этого использовал multiprocessing.Manager(), чтобы создать разделяемый объект между процессами.
def check_on_max_percentage_change(percentage, info_dict, lock):
with lock:
profitable_data = read_json('profitable.json')
if not profitable_data[0]['max-profitable']:
profitable_data[0]['max-profitable'].append({'percentage': percentage, 'data': info_dict})
else:
for i in range(len(profitable_data[0]['max-profitable'])):
if profitable_data[0]['max-profitable'][i]['percentage'] < percentage:
profitable_data[0]['max-profitable'].insert(i, {'percentage': percentage, 'data': info_dict})
break
else:
profitable_data[0]['max-profitable'].append({'percentage': percentage, 'data': info_dict})
break
if len(profitable_data[0]['max-profitable']) > 10:
del profitable_data[0]['max-profitable'][10:]
write_json('profitable.json', profitable_data)