Threading | Большое кол-во трэдов в системе, которое постоянно увеличивается
Использую мультипоточность и с ней программа работаем более чем хорошо по функционалу. Но есть одна проблема — создается большое число трэдов (более 2500К) даже в том случае, когда на вход функциям подается какое-то крошечное кол-во данных, т.е. когда нагрузки по сути никакой нет. Обратил внимание, что число трэдов увеличивается и не помогает даже перезагрузка системы. Пробовал чистить кеш питона, но безрезультатно.
Вариант с применением ThreadPoolExecutor мне не очень подходит, потому что как только я ограничиваю общее кол-во трэдов, то программа перестает правильно работать. Возможно, что я делаю что-то не так.
Подскажите, пожалуйста, как без вреда для функциональности и производительности снизить общее кол-во трэдов в системе, ну или по крайней сделать так, чтобы они не увеличивались по непонятной причине.
@log_exceptions
def main():
# Schedule the clearing of sets at 5am every day
# schedule.every().day.at("05:00").do(clear_sets)
while True:
threads = []
with open("result.json", encoding = 'utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
# Handle empty file exception
logger.error("JSONDecodeError: Empty JSON file")
time.sleep(60)
continue
for fixture in data:
thread = threading.Thread(target = running, args = (fixture, "result"))
threads.append(thread)
with open("result_h2h.json", encoding = 'utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
# Handle empty file exception
logger.error("JSONDecodeError: Empty JSON file")
time.sleep(60)
continue
for fixture in data:
thread = threading.Thread(target = running, args = (fixture, "result_h2h"))
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# Run any pending scheduled jobs
while True:
try:
schedule.run_pending()
except Exception as e:
logger.debug(e)
time.sleep(60)
# Delay for 60 seconds before the next iteration
time.sleep(60)
@log_exceptions
def running(fixture, data_type):
while True:
thread = threading.Thread(target = first_half_0_5,
args = (fixture, data_type, processed_0_5_ids, processed_0_5_ids_h2h)
)
threads = [thread]
thread = threading.Thread(target = first_half_1_5,
args = (fixture, data_type, processed_1_5_ids, processed_1_5_ids_h2h)
)
threads.append(thread)
thread = threading.Thread(target = tb25fulltime, args = (
fixture, data_type, processed_tb25fulltime_ids, processed_tb25fulltime_ids_h2h)
)
threads.append(thread)
thread = threading.Thread(target = draw,
args = (fixture, data_type, processed_draw_ids, processed_draw_ids_h2h)
)
threads.append(thread)
thread = threading.Thread(target = no_draw,
args = (fixture, data_type, processed_no_draw_ids, processed_no_draw_ids_h2h)
)
threads.append(thread)
thread = threading.Thread(target = one_zero,
args = (fixture, data_type, processed_one_zero_ids, processed_one_zero_ids_h2h)
)
threads.append(thread)
thread = threading.Thread(target = home_zero,
args = (fixture, data_type, processed_home_zero_ids, processed_home_zero_ids_h2h)
)
threads.append(thread)
thread = threading.Thread(target = away_zero,
args = (fixture, data_type, processed_away_zero_ids, processed_away_zero_ids_h2h)
)
threads.append(thread)
thread = threading.Thread(target = alternation,
args = (fixture, data_type, processed_alternation_ids, processed_alternation_ids_h2h)
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# Delay for 60 seconds before the next iteration
time.sleep(60)
if __name__ == "__main__":
main()