Как отменить работающие потоки threading

Пытаюсь отменить запущеные потоки после достижения timeout в методе concurrent.futures.wait, но программа все равно работает 10 секунд указанные в if url == "url_3":

import concurrent.futures
from time import sleep, time

sources = ["url_1", "url_2", "url_3"]

def get_request_header(url: str) -> str:
    if url == "url_1":
        sleep(0.8)
    if url == "url_2":
        sleep(0.9)
    if url == "url_3":
        sleep(10)
    return f"{url}-headers"


start = time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

    futures = [executor.submit(get_request_header, url) for url in sources]
    completed, uncompleted = concurrent.futures.wait(futures, timeout=1.5)

    for future in uncompleted:
        future.cancel()

for future in completed:
    print(future.result())

print(time() - start)

Если принтануть и проверить состояние после cancel(), то оно все еще running и собственно он возвращает False

for future in uncompleted:
    print(future)
    print(future.cancel())
    print(future)

Необходимо ограничить работу потоков через timeout параметр и что бы не завершенные потоки прекратили работу, что не так?

Также пробовал через as_completed, через map не предлагать.


Ответы (1 шт):

Автор решения: Maksim Alekseev

Как в коментах отметил @Amgarak запущенные задачи нельзя отменить.

Но что бы главный поток не блокировался, можно создать ThreadPoolExecutor без контекстного менеджера и явно вызвать в конце метод shutdown (не завершает выполнение уже запущенных задач) с параметрами (wait=False, cancel_futures=True)

start = time()

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
try:
    futures = [executor.submit(get_request_header, url) for url in sources]
    completed, uncompleted = concurrent.futures.wait(futures, timeout=1.5)
finally:
    executor.shutdown(wait=False, cancel_futures=True)

print(time() - start)

Output:

1.5036067962646484

UPD Есть нехороший лайфхак

Можно обернуть код в функцию и создать демонический поток

def task():
    <предыдущий код>

thread = threading.Thread(target=task, daemon=True)
thread.start()
thread.join(1.5)

Далее залезть в модуль concurrent.futures.thread и закоментировать строчки ожидания выполения задач и тогда потоки выполняющие запущенные задачи будут отменены.

def _python_exit():
    global _shutdown
    with _global_shutdown_lock:
        _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    # for t, q in items:
    #     t.join()
→ Ссылка