Как отменить работающие потоки threading
Пытаюсь отменить запущеные потоки после достижения timeout в методе concurrent.futures.wait, но программа все равно работает 10 секунд указанные в if url == "url_3":
import concurrent.futures
from time import sleep, time
sources = ["url_1", "url_2", "url_3"]
def get_request_header(url: str) -> str:
if url == "url_1":
sleep(0.8)
if url == "url_2":
sleep(0.9)
if url == "url_3":
sleep(10)
return f"{url}-headers"
start = time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(get_request_header, url) for url in sources]
completed, uncompleted = concurrent.futures.wait(futures, timeout=1.5)
for future in uncompleted:
future.cancel()
for future in completed:
print(future.result())
print(time() - start)
Если принтануть и проверить состояние после cancel(), то оно все еще running и собственно он возвращает False
for future in uncompleted:
print(future)
print(future.cancel())
print(future)
Необходимо ограничить работу потоков через timeout параметр и что бы не завершенные потоки прекратили работу, что не так?
Также пробовал через as_completed, через map не предлагать.
Ответы (1 шт):
Как в коментах отметил @Amgarak запущенные задачи нельзя отменить.
Но что бы главный поток не блокировался, можно создать ThreadPoolExecutor без контекстного менеджера и явно вызвать в конце метод shutdown (не завершает выполнение уже запущенных задач) с параметрами (wait=False, cancel_futures=True)
start = time()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
try:
futures = [executor.submit(get_request_header, url) for url in sources]
completed, uncompleted = concurrent.futures.wait(futures, timeout=1.5)
finally:
executor.shutdown(wait=False, cancel_futures=True)
print(time() - start)
Output:
1.5036067962646484
UPD Есть нехороший лайфхак
Можно обернуть код в функцию и создать демонический поток
def task():
<предыдущий код>
thread = threading.Thread(target=task, daemon=True)
thread.start()
thread.join(1.5)
Далее залезть в модуль concurrent.futures.thread и закоментировать строчки ожидания выполения задач и тогда потоки выполняющие запущенные задачи будут отменены.
def _python_exit():
global _shutdown
with _global_shutdown_lock:
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
# for t, q in items:
# t.join()