Многопоточность с асинхронностью на python

У меня запускается 5 отдельных потоков, два из которых асинхронные, но из асинхронных выполняется только один, который стоит раньше другого, другой просто не выполняется. Код

Файл main.py

def run():
    with ThreadPoolExecutor(max_workers=5) as executor:
        executor.submit(get_rate)
        executor.submit(bot.run)
        executor.submit(log.run)
        executor.submit(buff_async.run)
        executor.submit(csm_get_cost.run())


if __name__ == '__main__':
    run()

Файл buff_async.py

async def main():
    guns_total_less_90_statTrak = read_json('BUFF/DataIds/guns_total_less_90_statTrak.json')
    guns_total_less_90_without_statTrak = read_json('BUFF/DataIds/guns_total_less_90_without_statTrak.json')
    guns_total_more_90_statTrak = read_json('BUFF/DataIds/guns_total_more_90_statTrak.json')
    guns_total_more_90_without_statTrak = read_json('BUFF/DataIds/guns_total_more_90_without_statTrak.json')
    knifes = read_json('BUFF/DataIds/knife.json')
    hands = read_json('BUFF/DataIds/hands.json')
    stickers = read_json('BUFF/DataIds/sticker.json')
    unseparated_items = read_json('BUFF/DataIds/unseparated_items.json')

    threading.Thread(target=get_cost_stickers).start()

    tasks = [
        process_group(guns_total_more_90_without_statTrak, sem_limit=850, proxies=config.proxies, group_name='guns_total_more_90_without_statTrak'),
        # process_group(guns_total_less_90_without_statTrak, sem_limit=100, proxies=config.proxies_less, group_name='guns_total_less_90_without_statTrak'),
        # process_group(guns_total_more_90_statTrak, sem_limit=50, proxies=config.proxies_more_st, group_name='guns_total_more_90_statTrak'),
        # process_group(guns_total_less_90_statTrak, sem_limit=60, proxies=config.proxies_less_st, group_name='guns_total_less_90_statTrak'),
        # process_group(knifes, sem_limit=10, proxies=config.proxies_other, group_name='knifes'),
        # process_group(hands, sem_limit=10, proxies=config.proxies_other, group_name='hands'),
        # process_group(stickers, sem_limit=25, proxies=config.proxies_other, group_name='stickers'),
        # process_group(unseparated_items, sem_limit=10, proxies=config.proxies_other, group_name='unseparated_items')
    ]

    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        pass


def run():
    asyncio.run(main())

Файл csm_get_cost.py

async def main():
    data = {}
    data.update(read_json('../BUFF/DataIds/guns_total_less_90_statTrak.json'))
    data.update(read_json('../BUFF/DataIds/guns_total_less_90_without_statTrak.json'))
    data.update(read_json('../BUFF/DataIds/guns_total_more_90_statTrak.json'))
    data.update(read_json('../BUFF/DataIds/guns_total_more_90_without_statTrak.json'))
    data.update(read_json('../BUFF/DataIds/knife.json'))
    data.update(read_json('../BUFF/DataIds/hands.json'))
    data.update(read_json('../BUFF/DataIds/sticker.json'))
    data.update(read_json('../BUFF/DataIds/unseparated_items.json'))
    tasks = [
        process_task(data)
    ]

    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        pass


def run():
    asyncio.run(main())

Вопрос: как сделать, чтобы эти две асинхронности запускались параллельно друг другу?


Ответы (2 шт):

Автор решения: Mr. Nazy

В общем, решил проблему. Когда код запускается, две асинхронные функции main из buff_async.py и csm_get_cost.py выполняются последовательно, а не одновременно. Это происходит потому, использовался asyncio.run() в каждом файле, который создает новый цикл событий и запускает асинхронные функции в нем, но первое ожидает завершения второго события. Чтобы решить проблему, надо использовать событийный цикл (asyncio) в основном потоке.

Вот переделанный код:

async def run_buff_async():
    await buff_async.main()


async def run_csm_get_cost():
    await csm_get_cost.main()


def run():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    tasks = [
        loop.create_task(run_buff_async()),
        loop.create_task(run_csm_get_cost())
    ]

    with ThreadPoolExecutor(max_workers=3) as executor:
        executor.submit(get_rate)
        executor.submit(bot.run)
        executor.submit(log.run)
        loop.run_until_complete(asyncio.gather(*tasks))


if __name__ == '__main__':
    run()
→ Ссылка
Автор решения: Сергей Ш
async def run_buff_async():
    await buff_async.main()
  
async def run_csm_get_cost():
    await csm_get_cost.main()

async as_run()
    await asyncio.gather(run_buff_async(), run_csm_get_cost())

def run():
    with ThreadPoolExecutor(max_workers=3) as executor:
        executor.submit(get_rate)
        executor.submit(bot.run)
        executor.submit(log.run)
    asyncio.run(as_run())

if __name__ == '__main__':
    run()
→ Ссылка