Как запустить повторяющуюся блокирующую функцию в разных threads с помощью asyncio, python
Есть блокирующая функция, для примера возьмем:
def blockme(n):
x = random.random() * 2.0
time.sleep(x)
return n, x
(На самом деле мне нужны реквесты по API, но в контексте данного вопроса это не важно)
Мне надо запускать эту функцию в бесконечном цикле, причем каждый следующий запуск не должен дожидаться окончания работы предыдущих.
Как это сделать с помощью модуля threading я, в общих чертах, понимаю.
if __name__ == '__main__':
i = 0
while True:
i += 1
threading.Thread(target=blockme, args=(i,)).start()
Но у меня возникли вопросы, которые скорее носят исследовательский характер нежели практический:
- как это сделать с помощью
asyncio, так какasyncioтакже поддерживает треды и прочиеconcurrent.futures - что не менее важно, а стоит ли это делать с помощью
asyncio? Что дастasyncio? Простоту синтаксиса? Более читаемый код? - а вообще это возможно?
Я уже задал этот вопрос на Stackoverflow.com, правда с момента, как я его задал я уже получил ряд промежуточных знаний и поэтому вопрос модифицировался и отличается от оригинального. На данный момент, обсуждение на SO.com продолжается, задача не решена, но я получил следующую проблему. Один из участников предложил решение:
import asyncio
import random
import time
def blockme(n):
x = random.random() * 2.0
time.sleep(x)
return n, x
def cb(fut):
print("Result", fut.result())
async def main():
loop = asyncio.get_event_loop()
futs = []
for n in range(20):
fut = loop.run_in_executor(None, blockme, n)
fut.add_done_callback(cb)
futs.append(fut)
await asyncio.gather(*futs)
# await asyncio.sleep(10)
asyncio.run(main())
Тут он еще добавил футуры, подразумевая, что нам еще нужны результаты работы (для моей задачи это не обязательно, если ваше решение будет без футур - мне тоже подойдет). И в таком виде, как он написал - все как-будто работает. Но тут есть нюанс: он утверждает, что если заменить for n in range(20) на while True, то все тоже будет работать, но у меня так не работает. Код что-то начинает делать в бесконечном цикле, но никакого вывода результатов не происходит. У меня есть подозрение, что то, что лежит в теле цикла не запускает функцию, а лишь определяет и собирает футуры в список. И без await asyncio.gather(*futs) работать не будет, а значит, задачу не решает, так как бесконечный цикл невозможен.
Так что есть еще вопросы:
- А у вас этот код работает, если заменить ограниченый цикл на бесконечный?
- Если - "да", то есть варианты почему он не работает у меня? Проблема в окружении (версии модулей, настройки ОС и т.д.)?
PS.
На русском SO я нашел еще вот такой вопрос, он похож, но во-первых, нет речи про бесконечный цикл, а, во-вторых, предложенное решение, все равно использовало threading в явном виде и коллеги критиковали использование asyncio, но одно дело - критиковать, а другое - можно ли это в принципе сделать.
Ответы (2 шт):
Футуры выполняются при вызове await другой функции или при выходе в другую асинхронную функцию.
while True: записывай как while await asyncio.sleep(0, result=True): и тогда треды будут запускаться на каждом круге.
Экзекутор нужно определить чтоб ограничить количество тредов. По умолчанию количество ядер умноженное на 5.
pool = concurrent.futures.ThreadPoolExecutor(max_workers=40)
...
while await asyncio.sleep(0, result=True):
fut = loop.run_in_executor(pool, blockme, n)
....
Но при этом футуры будут всё ещё набиваться в очередь. Не знаю что стоит за причиной запускать бесконечное число потоков, но тут надо бы какой семафор поставить чтоб не забивать. Хотя бы тупой
if len(futs)>40:
completed, futs = await asyncio.wait(futs, timeout=5)
С определенными оговорками ответ выглядит так
import asyncio
from asyncio import FIRST_COMPLETED
import random
import time
import concurrent
def blockme(n):
x = random.random() * 2.0
time.sleep(x)
return n, x
def cb(fut):
print("Result", fut.result())
async def main():
#Требуется контролировать количество потоков
pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)
loop = asyncio.get_event_loop()
futs_set = set()
n = 0
#Требуется контролировать количество активных футур
futures_limit = 40
#Требуется контролировать частоту запуска блокирующих функций
delay = 0.15
while True:
await asyncio.sleep(delay)
fut = loop.run_in_executor(pool, blockme, n)
fut.add_done_callback(cb)
futs_set.append(fut)
#Требуется организовать логику,
#для того, чтобы футуры не копились бесконечно, например:
if len(fut_set) >= futures_limit:
completed, fut_set = await asyncio.wait(
fut_set,
timeout=5,
return_when=FIRST_COMPLETED
)
asyncio.run(main())