Запуск нескольких задач одновременно в PyQt5

Я разрабатываю приложение на PyQt5 с использованием qasync. Допустим на форме есть некие настройки, поле для вывода лога и кнопки запустить/остановить. При выполнении одной задачи проблем не возникает: кнопка запуска и настройки становятся недоступными, кнопка 'остановить' становится доступной. В цикле поочередно выполняются задачи, результаты выводятся в лог, а при завершении выводятся итоговые результаты, кнопка запустить и настройки снова становятся доступными.

Я вкратце описала принцип работы для того, чтобы задать вопрос: как повторить эти же действия при выполнении всех задач одновременно?

Большинство кода остаётся таким же, глобальное отличие в цикле выполнения, допустим в поочерёдном выполнении было так:

for sender in senders:
    ...
    # Поочерёдное действие с каждым sender
    ...

А выполнение всех задач одновременно я написала вот так:

async def execute_senders(self):
    ...
    for sender in senders:
        ...
        loop.create_task(self.work_with_sender(sender))
        ...
    ...

...

async def work_with_sender(self, sender):
    ...
    # Действия с sender
    ...

В целом всё работает, но возникла проблема: у меня не получилось отслеживать все запущенные задачи и выводить итоговые результаты и также это порождает другую проблему: почему-то выполнение может зависнуть, даже на том моменте, когда никаких операций не выполняется, а выполняется условное присваивание значений переменным.

По сути это же шаблон для среднестатистического приложения: выставляются параметры, нажимается кнопка запуск, идёт обработка данных и в конце выводится итоговый результат. Но примеров как это сделать на PyQt5 я не нашла. Моё решение с create_task в принципе работает и даёт ожидаемый результат, но вышеперечисленные проблемы мне не удалось решить.

Воспроизводимый пример:

import random
import qasync
import asyncio
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from datetime import datetime


check_exe = True
success_req, error_req = 0, 0


class Window(QWidget):
    def __init__(self):
        QWidget.__init__(self)

        self.text_edit = QTextEdit()
        self.settings_button = QPushButton('Настройки')
        self.start_button = QPushButton('Запустить')
        self.stop_button = QPushButton('Остановить')

        self.start_button.clicked.connect(self.start_button_clicked)
        self.stop_button.clicked.connect(self.stop_button_clicked)
        self.stop_button.setEnabled(False)

        layout = QVBoxLayout()
        layout.addWidget(self.settings_button)
        layout.addWidget(self.start_button)
        layout.addWidget(self.stop_button)
        layout.addWidget(self.text_edit)
        self.setLayout(layout)

    @qasync.asyncSlot()
    async def start_button_clicked(self, *args, **kwargs):
        global check_exe, success_req, error_req
        all_delays = 0
        check_exe = True
        success_req, error_req = 0, 0
        senders = ['data', 'test', 'user']
        requests = [['connect', 'disconnect'], ['send', 'get'], ['connect', 'get']]

        self.update_log('Запуск...')

        self.stop_button.setEnabled(True)
        self.settings_button.setEnabled(False)
        self.start_button.setEnabled(False)

        count = 0
        loop = asyncio.get_event_loop()
        for sender in senders:
            if check_exe:
                delay = random.randint(1, 5)
                all_delays += delay
                loop.create_task(self.execute_query(sender, requests[count], delay))
                count += 1
            else:
                break

        if check_exe:
            await asyncio.sleep(all_delays)
            self.update_log(f'Ожидание {all_delays} секунд...')

        self.stop_button.setEnabled(False)
        self.settings_button.setEnabled(True)
        self.start_button.setEnabled(True)

        self.update_log('Обработка завершена.')
        self.update_log(f'Успешно отправлено: {success_req}.')
        self.update_log(f'Возникло ошибок: {error_req}.')

    def stop_button_clicked(self):
            global check_exe
            check_exe = False
            self.update_log('Принудительная остановка выполнения...')

    async def execute_query(self, sender, requests, delay, thread):
        global success_req, error_req
        self.update_log(f' (поток {thread}) ~ Обработка отправителя {sender}...')
        if check_exe:
            for query in requests:
                if check_exe:
                    self.update_log(f'(поток {thread}) Выполнение запроса {query} отправителем {sender}...')
                    if bool(random.randint(0, 1)):
                        success_req += 1
                        self.update_log(f'(поток {thread}) Запрос {query} успешно выполнен отправителем {sender}.')
                    else:
                        error_req += 1
                        self.update_log(f'(поток {thread}) Ошибка при выполненни запроса {query} отправителем {sender}.')
                    await asyncio.sleep(delay)
                    self.update_log(f'(поток {thread}) Ожидание {delay} секунд...')
                else:
                    break
        self.update_log(f'(поток {thread}) ~ Отправитель {sender} завершил работу.')

    def update_log(self, text):
        now = datetime.now()
        log_date = f"[{now.hour}:{now.minute}:{now.second}] "
        self.text_edit.append(f'{log_date}{text}\n')
        self.text_edit.moveCursor(QTextCursor.End)


async def main():
    asyncio.get_event_loop()
    future = asyncio.Future()

    app = QApplication.instance()
    app.setStyle("fusion")
    app.window = Window()
    app.window.resize(800, 600)
    app.window.show()

    await future
    return True


if __name__ == '__main__':
    qasync.run(main())

Решения, которые я пробовала:

В примере я использовала суммарное ожидание всех задач, после чего сделала вывод результатов. Но это точно не решение, так как при реальном использовании помимо заданных задержек, нужно время на выполнение самой задачи. Этот фрагмент я добавила, чтобы примерно показать, что я хочу получить.

Попытка использования asyncio.gather: получались непредвиденные результаты, отследить завершение всех задач мне не удалось.

    ...
    # Создание списка списков с задачами отправителей
    all_senders_data = self.get_data_for_sender(senders, requests)

    tasks = []
    coroutines = [self.work_with_sender(sender) for sender in all_senders_data]

    for coro in asyncio.as_completed(coroutines):
        result = await coro
        tasks.append(asyncio.create_task(self.write_results(result)))

    await asyncio.gather(*tasks)

async def write_results(self, result):
    if result[0]:
        self.update_log(f' ~ Отправитель {result[1]} завершил работу.')
    else:
        self.update_log(f' ~ Отправитель {result[1]} не отработал.')

Пробовала решения из этого, этого и этого вопросов, но получала различные ошибки, например event loop already running, при использовании первого примера:

asyncio.ensure_future(async_foo())
...
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

В общем мне не удалось решить основную проблему - отследить завершение всех задач.


Ответы (1 шт):

Автор решения: eri

отслеживание задач

положи таски в массив и жди через один из примитивов ожидания

Ждать всех:

pending=[]
...
pending.append(loop.create_task(....))
...
await asyncio.wait(pending)

или дергать готовые таски по одному

while pending:
    done, pending = await asyncio.wait(pending, return_when=FIRST_COMPLETED)
    for task in done:
        result = await task

и можно добавить прерывания через таймаут:

while pending:
    try:
        done, pending = await asyncio.wait(pending, timeout=5, return_when=FIRST_COMPLETED)
    except asyncio.TimeoutError:
        print(f'осталось {len(pending)} тасков')
    else:
        for task in done:
            result = await task

А на зависания хорошо бы посмотреть дебагером. При использовании асинхронных задач в том же треде (qasync) интерфейс не будет отклиаться во время выполнения всех кусков програмы между словами await. Присваивание и простые операции не должны вызвать проблем, но на долгих циклах вставляй await asyncio.sleep(0).

→ Ссылка