Код долго думает после 4 символа

import asyncio
import string
import itertools
import time


PASSWORD = 'zxcgg'


async def check_password(attempt):
    print(f"Пробуем: {attempt}")

    if attempt == PASSWORD:
        print(f"Пароль найден: {attempt}")
        return True
    else:
        return False


async def async_bruteforce():
    characters = string.ascii_lowercase
    for length in range(1, len(PASSWORD) + 1):
        tasks = []
        for attempt in itertools.product(characters, repeat=length):
            attempt = ''.join(attempt)
            task = asyncio.create_task(check_password(attempt))
            tasks.append(task)

        # Ждём завершения всех задач текущей длины
        results = await asyncio.gather(*tasks)
        if any(results):
            return


print("\n Асинхронный перебор пароля:")
async_start_time = time.time()
asyncio.run(async_bruteforce())
print(f"Время асинхронного перебора: {time.time() - async_start_time:.2f} секунд")

Ответы (2 шт):

Автор решения: Pak Uula

Ваш код очень сильно задумывается вот в этом месте: results = await asyncio.gather(*tasks), где вы сначала асинхронно запускаете почти двенадцать миллионов задач, а затем собираете их результаты в список (26**5 = 11 881 376).

Ваш код аллоцирует отдельного исполнителя под каждую задачу, и запускает исполнителя ровно один раз. Так можно поступать, но не нужно. Гораздо практичнее использовать пул многоразовых исполнителей, которым главный процесс раздаёт задачи и собирает с них результаты. Для этого в asyncio есть специальный примитив asyncio.Queue, с помощью которого шаблон producer/consumer реализуется легко и непринуждённо:

# главный процесс
async def task_builder(max_length: int, tasks: asyncio.Queue, password: ResutlStorage[str]):
    """Генератор пробных строк для подбора пароля"""
    print("task_builder: Начало работы")
    for attempt in iterate_passwords(max_length):
        await tasks.put(attempt)
        if password.has_result():
            print(f"task_builder: Пароль найден: {password.get_result()}")
            print("task_builder: Завершение работы")
            return

# Работники
async def task_executor(num: int, tasks: asyncio.Queue, password: ResutlStorage[str]):
    """Потребитель пробных строк, выполняет проверку пароля.
    
    Параметры:
    num -- номер работника
    tasks -- очередь задач
    password -- хранилище результата
    """
    print(f"task_executor {num}: Начало работы")
    while True:
        if password.has_result():
            print(f"task_executor {num}: Пароль найден другим работником")
            print(f"task_executor {num}: Завершение работы")
            return
        attempt = await tasks.get()
        print(f"task_executor {num}: Пробуем: {attempt}")
        if await check_password(attempt):
            print(f"task_executor {num}: Пароль найден: {attempt}")
            password.set_result(attempt)
            print(f"task_executor {num}: Завершение работы")
            return

Главный процесс перебирает пароли и отправляет их работникам на проверку. Тот работник, который нашел пароль, записывает его в специальный регистр. Регистр, конечно, не очень хорошее решение. Идеал требует, чтобы работники и главный процесс общались только через message-passing, и не имели бы общее состояние, но когда хочется - то можно ))

Полное решение здесь: https://pastebin.com/A5FcvcTh Поиск пароля на core i7 занимает 31 секунду. Но... как вы, безусловно, знаете, asyncio только на словах асинхронная библиотека. Даже с ключевым словом async Пайтон продолжает выполняться в один поток. Поэтому это решение только прикидывается параллельным, оставаясь в глубине реализации сугубо синхронным однопоточным. Настоящая параллельность в Пайтоне реализуется только через multiprocessing. И там тем более нужно использовать шаблон с работниками, а не безудержно порождать процессы по одному на каждую проверку.

Решение с multiprocessing

Главный процесс делает примерно то же самое, что в асинхронном варианте, только задачи на проверку раздаёт не поштучно, а наборами по тысяче штук

def parallel_brute_force(
    number_of_workers: int = Defaults.number_of_workers,
    password_max_length: int = Defaults.password_max_length,
    chunk_size: int = Defaults.chunk_size,
):
    """Параллельный подбор пароля.

    Параметры:
    - number_of_workers -- количество рабочих процессов.
    - password_max_length -- максимальная длина пароля.
    - chunk_size -- количество вариантов паролей, передаваемых рабочему процессу за один раз.
    """
    guess_queue = mp.Queue(number_of_workers)
    result_queue = mp.Queue(1)
    processes = []
    generator = password_generator(password_max_length)

    # запускаем рабочие процессы
    for i in range(number_of_workers):
        p = mp.Process(target=worker, args=(f"w_{i}", guess_queue, result_queue))
        processes.append(p)
        p.start()

    password = None
    while True:
        if not result_queue.empty():
            # пароль найден
            password = result_queue.get()
            print(f"main: Пароль найден: {password}")
            break
        chunk = list(itertools.islice(generator, chunk_size))
        if not chunk:
            # все варианты перебраны
            break
        worker_print(f"main: Передача набора: {chunk[0]} - {chunk[-1]}")
        guess_queue.put(chunk)

    # ставим в очередь рабочим процессам сигнал завершения
    for i in range(number_of_workers):
        guess_queue.put(None)
    # ждем завершения всех процессов
    for p in processes:
        p.join()

    # закрываем очереди
    guess_queue.close()
    result_queue.close()

    # печать результата
    if password is not None:
        print(f"Пароль найден: {password}")
    else:
        print("Пароль не найден")
    return True

Тут тоже не совсем чистое решение, в один флакон замешаны создание параллельной инфраструктуры и генерация задач для работников, но и мы не договаривались на разработку production grade. Зато чистый message passing: главный процесс пишет работникам в очередь, а работник, нашедший решение, тоже пишет главному процессу в очередь.

_print_allowed = False


def worker_print(*args, **kwargs):  # pylint: disable=redefined-outer-name
    """Печать сообщения в рабочем процессе"""
    if _print_allowed:
        print(*args, **kwargs)


def worker(name: str, queue_in: mp.Queue, queue_out: mp.Queue):
    """Рабочий процесс для проверки паролей.

    Параметры:
    - name -- имя рабочего процесса. Используется для отладки.
    - queue_in -- очередь входных данных. Из этой очереди рабочий процесс получает
      набор вариантов паролей для проверки.
    - queue_out -- очередь выходных данных. Если найден пароль, он будет помещен в эту очередь.
    """
    worker_print(f"worker {name}: Начало работы")
    while True:
        guesses = queue_in.get()
        if guesses is None:
            worker_print(f"worker {name}: Завершение работы")
            return
        worker_print(
            f"worker {name}: Получен набор вариантов {guesses[0]} - {guesses[-1]}",
        )
        for password in guesses:
            if check_password(password):
                worker_print(f"worker {name}: Пароль найден: {password}")
                queue_out.put(password)
                return

Работники получают список либо None. В первом случае они перебирают предложенные главным процессом варианты, во втором завершают работу.

В этом решении параллельность настоящая. Процессы-работники операционная система раскидывает по отдельным ядрам, и они выполняются независимо, в разных адресных пространствах.

12 работников находят пароль на 16-ядерном core i7 за 2 секунды.

"""Многопроцессорный подбор пароля. Игрушечный пример."""

import argparse
import itertools
import string
import multiprocessing as mp
import time

# Пароль, который нужно найти
_password = "zxcgg"


# Функция проверки предполагаемого пароля
def check_password(password: str) -> bool:
    """Проверка пароля"""
    return password == _password


def password_generator(max_length: int):
    """Генерация всех возможных паролей заданной длины"""

    def fixed_length_generator(length: int):
        chars = string.ascii_lowercase
        for password in itertools.product(chars, repeat=length):
            yield "".join(password)

    for length in range(1, max_length + 1):
        yield from fixed_length_generator(length)


_print_allowed = False


def worker_print(*args, **kwargs):  # pylint: disable=redefined-outer-name
    """Печать сообщения в рабочем процессе"""
    if _print_allowed:
        print(*args, **kwargs)


def worker(name: str, queue_in: mp.Queue, queue_out: mp.Queue):
    """Рабочий процесс для проверки паролей.

    Параметры:
    - name -- имя рабочего процесса. Используется для отладки.
    - queue_in -- очередь входных данных. Из этой очереди рабочий процесс получает
      набор вариантов паролей для проверки.
    - queue_out -- очередь выходных данных. Если найден пароль, он будет помещен в эту очередь.
    """
    worker_print(f"worker {name}: Начало работы")
    while True:
        guesses = queue_in.get()
        if guesses is None:
            worker_print(f"worker {name}: Завершение работы")
            return
        worker_print(
            f"worker {name}: Получен набор вариантов {guesses[0]} - {guesses[-1]}",
        )
        for password in guesses:
            if check_password(password):
                worker_print(f"worker {name}: Пароль найден: {password}")
                queue_out.put(password)
                return


class Defaults:
    """Параметры по умолчанию"""

    password_max_length = 8
    number_of_workers = mp.cpu_count() - 1
    chunk_size = 1000


def parallel_brute_force(
    number_of_workers: int = Defaults.number_of_workers,
    password_max_length: int = Defaults.password_max_length,
    chunk_size: int = Defaults.chunk_size,
):
    """Параллельный подбор пароля.

    Параметры:
    - number_of_workers -- количество рабочих процессов.
    - password_max_length -- максимальная длина пароля.
    - chunk_size -- количество вариантов паролей, передаваемых рабочему процессу за один раз.
    """
    guess_queue = mp.Queue(number_of_workers)
    result_queue = mp.Queue(1)
    processes = []
    generator = password_generator(password_max_length)

    # запускаем рабочие процессы
    for i in range(number_of_workers):
        p = mp.Process(target=worker, args=(f"w_{i}", guess_queue, result_queue))
        processes.append(p)
        p.start()

    password = None
    while True:
        if not result_queue.empty():
            # пароль найден
            password = result_queue.get()
            print(f"main: Пароль найден: {password}")
            break
        chunk = list(itertools.islice(generator, chunk_size))
        if not chunk:
            # все варианты перебраны
            break
        worker_print(f"main: Передача набора: {chunk[0]} - {chunk[-1]}")
        guess_queue.put(chunk)

    # ставим в очередь рабочим процессам сигнал завершения
    for i in range(number_of_workers):
        guess_queue.put(None)
    # ждем завершения всех процессов
    for p in processes:
        p.join()

    # закрываем очереди
    guess_queue.close()
    result_queue.close()

    # печать результата
    if password is not None:
        print(f"Пароль найден: {password}")
    else:
        print("Пароль не найден")
    return True


if __name__ == "__main__":
    argparser = argparse.ArgumentParser(description="Параллельный подбор пароля")
    argparser.add_argument(
        "--max-length",
        "-l",
        type=int,
        default=Defaults.password_max_length,
        help=f"Максимальная длина пароля (по умолчанию {Defaults.password_max_length})",
    )
    argparser.add_argument(
        "--workers",
        "-w",
        type=int,
        default=Defaults.number_of_workers,
        help=f"Количество рабочих процессов (по умолчанию {Defaults.number_of_workers})",
    )
    argparser.add_argument(
        "--chunk-size",
        "-c",
        type=int,
        default=Defaults.chunk_size,
        help=f"Количество вариантов паролей, передаваемых рабочему процессу за один раз (по умолчанию {Defaults.chunk_size})",
    )
    argparser.add_argument(
        "--verbose",
        "-v",
        action="store_true",
        help="Вывод отладочной информации",
    )
    args = argparser.parse_args()

    print("Параллельный подбор пароля:")
    print("Параметры:")
    print(f"  Максимальная длина пароля: {argparser.parse_args().max_length}")
    print(f"  Количество рабочих процессов: {argparser.parse_args().workers}")
    print(
        f"  Число вариантов, передаваемых в рабочий процесс: {argparser.parse_args().chunk_size}"
    )
    _print_allowed = args.verbose

    start_time = time.time()
    parallel_brute_force(
        number_of_workers=argparser.parse_args().workers,
        password_max_length=argparser.parse_args().max_length,
        chunk_size=argparser.parse_args().chunk_size,
    )
    print(f"Время параллельного перебора: {time.time() - start_time:.2f} секунд")

До кучи я добавил параметры командной строки, чтобы поиграться с параметрами. Вот пример запуска параллельного подбора на 12 процессах с тысячей вариантов в одном задании.

$ python3.12 -O ./multi.py -w 12 -c 1000
Параллельный подбор пароля:
Параметры:
  Максимальная длина пароля: 8
  Количество рабочих процессов: 12
  Число вариантов, передаваемых в рабочий процесс: 1000
main: Пароль найден: zxcgg
Пароль найден: zxcgg
Время параллельного перебора: 2.47 секунд
→ Ссылка
Автор решения: CrazyElf

В данной постановке вопроса, с такой функцией проверки, асинхронность/многопоточность только вредит. Вы создаёте очень много задач, которые висят в памяти, а потом их все разом проверяете, при том, что нужная вам задача может быть давно уже закончилась, а вы всё продолжаете плодить задачи.

Для начала можно периодически разгребать задачи - какие-то задачи уже закончились, лучше их разобрать уже и забыть:

        for i,attempt in enumerate(itertools.product(characters, repeat=length), 1):
            attempt = ''.join(attempt)
            task = asyncio.create_task(check_password(attempt))
            tasks.append(task)
            if i % 2:
                done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
                results = await asyncio.gather(*done)
                if any(results):
                    return
                tasks = list(pending)

Причём, опыты показали, что чем чаще разбирать задачи, тем лучше. Тут я после создания каждых 3-х задач чищу очередь от уже закончившихся. Результат на моём не самом быстром компьютере: 28 секунд.

Но можно пойти дальше и вообще убрать очередь, потому что ваша простая проверка скорее всего отрабатывает моментально и никакая очередь тут вообще не нужна:

        for attempt in itertools.product(characters, repeat=length):
            result = await check_password(''.join(attempt))
            if result:
                return

Результат на моей машине: 5.5 секунд.

Но ещё лучше - вообще избавиться от асинхронности. Вы только зря создаёте задачи, на самом деле тут не нужна асинхронность. Убираем все async и await из кода, в частности из цикла:

        for attempt in itertools.product(characters, repeat=length):
            result = check_password(''.join(attempt))
            if result:
                return

Результат на моей машине: 4 секунды.

Для других функций проверки результаты могут быть другими, возможно там нужна будет многопоточность или асинхронность, но это сильно зависит от конкретики, просто так использовать асинхронность или многопоточность - это далеко не всегда ускорение кода. А бывает что и (как в коде, приведённом в вопросе) - это наоборот сильное замедление работы кода.

P.S. Первым делом я, конечно, убрал из кода print(f"Пробуем: {attempt}"), печать сильно замедляет работу кода, она нужна только для отладочных запусков, а если мы боремся за скорость, её нужно убирать.

→ Ссылка