Threading | Большое кол-во трэдов в системе, которое постоянно увеличивается

Использую мультипоточность и с ней программа работаем более чем хорошо по функционалу. Но есть одна проблема — создается большое число трэдов (более 2500К) даже в том случае, когда на вход функциям подается какое-то крошечное кол-во данных, т.е. когда нагрузки по сути никакой нет. Обратил внимание, что число трэдов увеличивается и не помогает даже перезагрузка системы. Пробовал чистить кеш питона, но безрезультатно.

Вариант с применением ThreadPoolExecutor мне не очень подходит, потому что как только я ограничиваю общее кол-во трэдов, то программа перестает правильно работать. Возможно, что я делаю что-то не так.

Подскажите, пожалуйста, как без вреда для функциональности и производительности снизить общее кол-во трэдов в системе, ну или по крайней сделать так, чтобы они не увеличивались по непонятной причине.

@log_exceptions
def main():
    # Schedule the clearing of sets at 5am every day
    # schedule.every().day.at("05:00").do(clear_sets)
 
    while True:
        threads = []
 
        with open("result.json", encoding = 'utf-8') as f:
            try:
                data = json.load(f)
            except json.JSONDecodeError:
                # Handle empty file exception
                logger.error("JSONDecodeError: Empty JSON file")
                time.sleep(60)
                continue
 
            for fixture in data:
                thread = threading.Thread(target = running, args = (fixture, "result"))
                threads.append(thread)
 
        with open("result_h2h.json", encoding = 'utf-8') as f:
            try:
                data = json.load(f)
            except json.JSONDecodeError:
                # Handle empty file exception
                logger.error("JSONDecodeError: Empty JSON file")
                time.sleep(60)
                continue
 
            for fixture in data:
                thread = threading.Thread(target = running, args = (fixture, "result_h2h"))
                threads.append(thread)
 
        for thread in threads:
            thread.start()
 
        for thread in threads:
            thread.join()
 
        # Run any pending scheduled jobs
        while True:
            try:
                schedule.run_pending()
            except Exception as e:
                logger.debug(e)
            time.sleep(60)
 
        # Delay for 60 seconds before the next iteration
        time.sleep(60)
 
 
@log_exceptions
def running(fixture, data_type):
    while True:
        thread = threading.Thread(target = first_half_0_5,
                                  args = (fixture, data_type, processed_0_5_ids, processed_0_5_ids_h2h)
                                  )
        threads = [thread]
        thread = threading.Thread(target = first_half_1_5,
                                  args = (fixture, data_type, processed_1_5_ids, processed_1_5_ids_h2h)
                                  )
        threads.append(thread)
 
        thread = threading.Thread(target = tb25fulltime, args = (
        fixture, data_type, processed_tb25fulltime_ids, processed_tb25fulltime_ids_h2h)
                                  )
        threads.append(thread)
 
        thread = threading.Thread(target = draw,
                                  args = (fixture, data_type, processed_draw_ids, processed_draw_ids_h2h)
                                  )
        threads.append(thread)
 
        thread = threading.Thread(target = no_draw,
                                  args = (fixture, data_type, processed_no_draw_ids, processed_no_draw_ids_h2h)
                                  )
        threads.append(thread)
 
        thread = threading.Thread(target = one_zero,
                                  args = (fixture, data_type, processed_one_zero_ids, processed_one_zero_ids_h2h)
                                  )
        threads.append(thread)
 
        thread = threading.Thread(target = home_zero,
                                  args = (fixture, data_type, processed_home_zero_ids, processed_home_zero_ids_h2h)
                                  )
        threads.append(thread)
 
        thread = threading.Thread(target = away_zero,
                                  args = (fixture, data_type, processed_away_zero_ids, processed_away_zero_ids_h2h)
                                  )
        threads.append(thread)
 
        thread = threading.Thread(target = alternation,
                                  args = (fixture, data_type, processed_alternation_ids, processed_alternation_ids_h2h)
                                  )
        threads.append(thread)
 
        for thread in threads:
            thread.start()
 
        for thread in threads:
            thread.join()
 
        # Delay for 60 seconds before the next iteration
        time.sleep(60)
 
 
if __name__ == "__main__":
    main()

Ответы (0 шт):