Многопоточность с асинхронностью на python
У меня запускается 5 отдельных потоков, два из которых асинхронные, но из асинхронных выполняется только один, который стоит раньше другого, другой просто не выполняется. Код
Файл main.py
def run():
with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(get_rate)
executor.submit(bot.run)
executor.submit(log.run)
executor.submit(buff_async.run)
executor.submit(csm_get_cost.run())
if __name__ == '__main__':
run()
Файл buff_async.py
async def main():
guns_total_less_90_statTrak = read_json('BUFF/DataIds/guns_total_less_90_statTrak.json')
guns_total_less_90_without_statTrak = read_json('BUFF/DataIds/guns_total_less_90_without_statTrak.json')
guns_total_more_90_statTrak = read_json('BUFF/DataIds/guns_total_more_90_statTrak.json')
guns_total_more_90_without_statTrak = read_json('BUFF/DataIds/guns_total_more_90_without_statTrak.json')
knifes = read_json('BUFF/DataIds/knife.json')
hands = read_json('BUFF/DataIds/hands.json')
stickers = read_json('BUFF/DataIds/sticker.json')
unseparated_items = read_json('BUFF/DataIds/unseparated_items.json')
threading.Thread(target=get_cost_stickers).start()
tasks = [
process_group(guns_total_more_90_without_statTrak, sem_limit=850, proxies=config.proxies, group_name='guns_total_more_90_without_statTrak'),
# process_group(guns_total_less_90_without_statTrak, sem_limit=100, proxies=config.proxies_less, group_name='guns_total_less_90_without_statTrak'),
# process_group(guns_total_more_90_statTrak, sem_limit=50, proxies=config.proxies_more_st, group_name='guns_total_more_90_statTrak'),
# process_group(guns_total_less_90_statTrak, sem_limit=60, proxies=config.proxies_less_st, group_name='guns_total_less_90_statTrak'),
# process_group(knifes, sem_limit=10, proxies=config.proxies_other, group_name='knifes'),
# process_group(hands, sem_limit=10, proxies=config.proxies_other, group_name='hands'),
# process_group(stickers, sem_limit=25, proxies=config.proxies_other, group_name='stickers'),
# process_group(unseparated_items, sem_limit=10, proxies=config.proxies_other, group_name='unseparated_items')
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
pass
def run():
asyncio.run(main())
Файл csm_get_cost.py
async def main():
data = {}
data.update(read_json('../BUFF/DataIds/guns_total_less_90_statTrak.json'))
data.update(read_json('../BUFF/DataIds/guns_total_less_90_without_statTrak.json'))
data.update(read_json('../BUFF/DataIds/guns_total_more_90_statTrak.json'))
data.update(read_json('../BUFF/DataIds/guns_total_more_90_without_statTrak.json'))
data.update(read_json('../BUFF/DataIds/knife.json'))
data.update(read_json('../BUFF/DataIds/hands.json'))
data.update(read_json('../BUFF/DataIds/sticker.json'))
data.update(read_json('../BUFF/DataIds/unseparated_items.json'))
tasks = [
process_task(data)
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
pass
def run():
asyncio.run(main())
Вопрос: как сделать, чтобы эти две асинхронности запускались параллельно друг другу?
Ответы (2 шт):
В общем, решил проблему. Когда код запускается, две асинхронные функции main из buff_async.py и csm_get_cost.py выполняются последовательно, а не одновременно. Это происходит потому, использовался asyncio.run() в каждом файле, который создает новый цикл событий и запускает асинхронные функции в нем, но первое ожидает завершения второго события. Чтобы решить проблему, надо использовать событийный цикл (asyncio) в основном потоке.
Вот переделанный код:
async def run_buff_async():
await buff_async.main()
async def run_csm_get_cost():
await csm_get_cost.main()
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
loop.create_task(run_buff_async()),
loop.create_task(run_csm_get_cost())
]
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(get_rate)
executor.submit(bot.run)
executor.submit(log.run)
loop.run_until_complete(asyncio.gather(*tasks))
if __name__ == '__main__':
run()
async def run_buff_async():
await buff_async.main()
async def run_csm_get_cost():
await csm_get_cost.main()
async as_run()
await asyncio.gather(run_buff_async(), run_csm_get_cost())
def run():
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(get_rate)
executor.submit(bot.run)
executor.submit(log.run)
asyncio.run(as_run())
if __name__ == '__main__':
run()