Код долго думает после 4 символа
import asyncio
import string
import itertools
import time
PASSWORD = 'zxcgg'
async def check_password(attempt):
print(f"Пробуем: {attempt}")
if attempt == PASSWORD:
print(f"Пароль найден: {attempt}")
return True
else:
return False
async def async_bruteforce():
characters = string.ascii_lowercase
for length in range(1, len(PASSWORD) + 1):
tasks = []
for attempt in itertools.product(characters, repeat=length):
attempt = ''.join(attempt)
task = asyncio.create_task(check_password(attempt))
tasks.append(task)
# Ждём завершения всех задач текущей длины
results = await asyncio.gather(*tasks)
if any(results):
return
print("\n Асинхронный перебор пароля:")
async_start_time = time.time()
asyncio.run(async_bruteforce())
print(f"Время асинхронного перебора: {time.time() - async_start_time:.2f} секунд")
Ответы (2 шт):
Ваш код очень сильно задумывается вот в этом месте: results = await asyncio.gather(*tasks)
, где вы сначала асинхронно запускаете почти двенадцать миллионов задач, а затем собираете их результаты в список (26**5 = 11 881 376
).
Ваш код аллоцирует отдельного исполнителя под каждую задачу, и запускает исполнителя ровно один раз. Так можно поступать, но не нужно. Гораздо практичнее использовать пул многоразовых исполнителей, которым главный процесс раздаёт задачи и собирает с них результаты. Для этого в asyncio
есть специальный примитив asyncio.Queue
, с помощью которого шаблон producer/consumer реализуется легко и непринуждённо:
# главный процесс
async def task_builder(max_length: int, tasks: asyncio.Queue, password: ResutlStorage[str]):
"""Генератор пробных строк для подбора пароля"""
print("task_builder: Начало работы")
for attempt in iterate_passwords(max_length):
await tasks.put(attempt)
if password.has_result():
print(f"task_builder: Пароль найден: {password.get_result()}")
print("task_builder: Завершение работы")
return
# Работники
async def task_executor(num: int, tasks: asyncio.Queue, password: ResutlStorage[str]):
"""Потребитель пробных строк, выполняет проверку пароля.
Параметры:
num -- номер работника
tasks -- очередь задач
password -- хранилище результата
"""
print(f"task_executor {num}: Начало работы")
while True:
if password.has_result():
print(f"task_executor {num}: Пароль найден другим работником")
print(f"task_executor {num}: Завершение работы")
return
attempt = await tasks.get()
print(f"task_executor {num}: Пробуем: {attempt}")
if await check_password(attempt):
print(f"task_executor {num}: Пароль найден: {attempt}")
password.set_result(attempt)
print(f"task_executor {num}: Завершение работы")
return
Главный процесс перебирает пароли и отправляет их работникам на проверку. Тот работник, который нашел пароль, записывает его в специальный регистр. Регистр, конечно, не очень хорошее решение. Идеал требует, чтобы работники и главный процесс общались только через message-passing, и не имели бы общее состояние, но когда хочется - то можно ))
Полное решение здесь: https://pastebin.com/A5FcvcTh
Поиск пароля на core i7 занимает 31 секунду. Но... как вы, безусловно, знаете, asyncio
только на словах асинхронная библиотека. Даже с ключевым словом async
Пайтон продолжает выполняться в один поток. Поэтому это решение только прикидывается параллельным, оставаясь в глубине реализации сугубо синхронным однопоточным. Настоящая параллельность в Пайтоне реализуется только через multiprocessing
. И там тем более нужно использовать шаблон с работниками, а не безудержно порождать процессы по одному на каждую проверку.
Решение с multiprocessing
Главный процесс делает примерно то же самое, что в асинхронном варианте, только задачи на проверку раздаёт не поштучно, а наборами по тысяче штук
def parallel_brute_force(
number_of_workers: int = Defaults.number_of_workers,
password_max_length: int = Defaults.password_max_length,
chunk_size: int = Defaults.chunk_size,
):
"""Параллельный подбор пароля.
Параметры:
- number_of_workers -- количество рабочих процессов.
- password_max_length -- максимальная длина пароля.
- chunk_size -- количество вариантов паролей, передаваемых рабочему процессу за один раз.
"""
guess_queue = mp.Queue(number_of_workers)
result_queue = mp.Queue(1)
processes = []
generator = password_generator(password_max_length)
# запускаем рабочие процессы
for i in range(number_of_workers):
p = mp.Process(target=worker, args=(f"w_{i}", guess_queue, result_queue))
processes.append(p)
p.start()
password = None
while True:
if not result_queue.empty():
# пароль найден
password = result_queue.get()
print(f"main: Пароль найден: {password}")
break
chunk = list(itertools.islice(generator, chunk_size))
if not chunk:
# все варианты перебраны
break
worker_print(f"main: Передача набора: {chunk[0]} - {chunk[-1]}")
guess_queue.put(chunk)
# ставим в очередь рабочим процессам сигнал завершения
for i in range(number_of_workers):
guess_queue.put(None)
# ждем завершения всех процессов
for p in processes:
p.join()
# закрываем очереди
guess_queue.close()
result_queue.close()
# печать результата
if password is not None:
print(f"Пароль найден: {password}")
else:
print("Пароль не найден")
return True
Тут тоже не совсем чистое решение, в один флакон замешаны создание параллельной инфраструктуры и генерация задач для работников, но и мы не договаривались на разработку production grade. Зато чистый message passing: главный процесс пишет работникам в очередь, а работник, нашедший решение, тоже пишет главному процессу в очередь.
_print_allowed = False
def worker_print(*args, **kwargs): # pylint: disable=redefined-outer-name
"""Печать сообщения в рабочем процессе"""
if _print_allowed:
print(*args, **kwargs)
def worker(name: str, queue_in: mp.Queue, queue_out: mp.Queue):
"""Рабочий процесс для проверки паролей.
Параметры:
- name -- имя рабочего процесса. Используется для отладки.
- queue_in -- очередь входных данных. Из этой очереди рабочий процесс получает
набор вариантов паролей для проверки.
- queue_out -- очередь выходных данных. Если найден пароль, он будет помещен в эту очередь.
"""
worker_print(f"worker {name}: Начало работы")
while True:
guesses = queue_in.get()
if guesses is None:
worker_print(f"worker {name}: Завершение работы")
return
worker_print(
f"worker {name}: Получен набор вариантов {guesses[0]} - {guesses[-1]}",
)
for password in guesses:
if check_password(password):
worker_print(f"worker {name}: Пароль найден: {password}")
queue_out.put(password)
return
Работники получают список либо None
. В первом случае они перебирают предложенные главным процессом варианты, во втором завершают работу.
В этом решении параллельность настоящая. Процессы-работники операционная система раскидывает по отдельным ядрам, и они выполняются независимо, в разных адресных пространствах.
12 работников находят пароль на 16-ядерном core i7 за 2 секунды.
"""Многопроцессорный подбор пароля. Игрушечный пример."""
import argparse
import itertools
import string
import multiprocessing as mp
import time
# Пароль, который нужно найти
_password = "zxcgg"
# Функция проверки предполагаемого пароля
def check_password(password: str) -> bool:
"""Проверка пароля"""
return password == _password
def password_generator(max_length: int):
"""Генерация всех возможных паролей заданной длины"""
def fixed_length_generator(length: int):
chars = string.ascii_lowercase
for password in itertools.product(chars, repeat=length):
yield "".join(password)
for length in range(1, max_length + 1):
yield from fixed_length_generator(length)
_print_allowed = False
def worker_print(*args, **kwargs): # pylint: disable=redefined-outer-name
"""Печать сообщения в рабочем процессе"""
if _print_allowed:
print(*args, **kwargs)
def worker(name: str, queue_in: mp.Queue, queue_out: mp.Queue):
"""Рабочий процесс для проверки паролей.
Параметры:
- name -- имя рабочего процесса. Используется для отладки.
- queue_in -- очередь входных данных. Из этой очереди рабочий процесс получает
набор вариантов паролей для проверки.
- queue_out -- очередь выходных данных. Если найден пароль, он будет помещен в эту очередь.
"""
worker_print(f"worker {name}: Начало работы")
while True:
guesses = queue_in.get()
if guesses is None:
worker_print(f"worker {name}: Завершение работы")
return
worker_print(
f"worker {name}: Получен набор вариантов {guesses[0]} - {guesses[-1]}",
)
for password in guesses:
if check_password(password):
worker_print(f"worker {name}: Пароль найден: {password}")
queue_out.put(password)
return
class Defaults:
"""Параметры по умолчанию"""
password_max_length = 8
number_of_workers = mp.cpu_count() - 1
chunk_size = 1000
def parallel_brute_force(
number_of_workers: int = Defaults.number_of_workers,
password_max_length: int = Defaults.password_max_length,
chunk_size: int = Defaults.chunk_size,
):
"""Параллельный подбор пароля.
Параметры:
- number_of_workers -- количество рабочих процессов.
- password_max_length -- максимальная длина пароля.
- chunk_size -- количество вариантов паролей, передаваемых рабочему процессу за один раз.
"""
guess_queue = mp.Queue(number_of_workers)
result_queue = mp.Queue(1)
processes = []
generator = password_generator(password_max_length)
# запускаем рабочие процессы
for i in range(number_of_workers):
p = mp.Process(target=worker, args=(f"w_{i}", guess_queue, result_queue))
processes.append(p)
p.start()
password = None
while True:
if not result_queue.empty():
# пароль найден
password = result_queue.get()
print(f"main: Пароль найден: {password}")
break
chunk = list(itertools.islice(generator, chunk_size))
if not chunk:
# все варианты перебраны
break
worker_print(f"main: Передача набора: {chunk[0]} - {chunk[-1]}")
guess_queue.put(chunk)
# ставим в очередь рабочим процессам сигнал завершения
for i in range(number_of_workers):
guess_queue.put(None)
# ждем завершения всех процессов
for p in processes:
p.join()
# закрываем очереди
guess_queue.close()
result_queue.close()
# печать результата
if password is not None:
print(f"Пароль найден: {password}")
else:
print("Пароль не найден")
return True
if __name__ == "__main__":
argparser = argparse.ArgumentParser(description="Параллельный подбор пароля")
argparser.add_argument(
"--max-length",
"-l",
type=int,
default=Defaults.password_max_length,
help=f"Максимальная длина пароля (по умолчанию {Defaults.password_max_length})",
)
argparser.add_argument(
"--workers",
"-w",
type=int,
default=Defaults.number_of_workers,
help=f"Количество рабочих процессов (по умолчанию {Defaults.number_of_workers})",
)
argparser.add_argument(
"--chunk-size",
"-c",
type=int,
default=Defaults.chunk_size,
help=f"Количество вариантов паролей, передаваемых рабочему процессу за один раз (по умолчанию {Defaults.chunk_size})",
)
argparser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Вывод отладочной информации",
)
args = argparser.parse_args()
print("Параллельный подбор пароля:")
print("Параметры:")
print(f" Максимальная длина пароля: {argparser.parse_args().max_length}")
print(f" Количество рабочих процессов: {argparser.parse_args().workers}")
print(
f" Число вариантов, передаваемых в рабочий процесс: {argparser.parse_args().chunk_size}"
)
_print_allowed = args.verbose
start_time = time.time()
parallel_brute_force(
number_of_workers=argparser.parse_args().workers,
password_max_length=argparser.parse_args().max_length,
chunk_size=argparser.parse_args().chunk_size,
)
print(f"Время параллельного перебора: {time.time() - start_time:.2f} секунд")
До кучи я добавил параметры командной строки, чтобы поиграться с параметрами. Вот пример запуска параллельного подбора на 12 процессах с тысячей вариантов в одном задании.
$ python3.12 -O ./multi.py -w 12 -c 1000
Параллельный подбор пароля:
Параметры:
Максимальная длина пароля: 8
Количество рабочих процессов: 12
Число вариантов, передаваемых в рабочий процесс: 1000
main: Пароль найден: zxcgg
Пароль найден: zxcgg
Время параллельного перебора: 2.47 секунд
В данной постановке вопроса, с такой функцией проверки, асинхронность/многопоточность только вредит. Вы создаёте очень много задач, которые висят в памяти, а потом их все разом проверяете, при том, что нужная вам задача может быть давно уже закончилась, а вы всё продолжаете плодить задачи.
Для начала можно периодически разгребать задачи - какие-то задачи уже закончились, лучше их разобрать уже и забыть:
for i,attempt in enumerate(itertools.product(characters, repeat=length), 1):
attempt = ''.join(attempt)
task = asyncio.create_task(check_password(attempt))
tasks.append(task)
if i % 2:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
results = await asyncio.gather(*done)
if any(results):
return
tasks = list(pending)
Причём, опыты показали, что чем чаще разбирать задачи, тем лучше. Тут я после создания каждых 3-х задач чищу очередь от уже закончившихся. Результат на моём не самом быстром компьютере: 28 секунд.
Но можно пойти дальше и вообще убрать очередь, потому что ваша простая проверка скорее всего отрабатывает моментально и никакая очередь тут вообще не нужна:
for attempt in itertools.product(characters, repeat=length):
result = await check_password(''.join(attempt))
if result:
return
Результат на моей машине: 5.5 секунд.
Но ещё лучше - вообще избавиться от асинхронности. Вы только зря создаёте задачи, на самом деле тут не нужна асинхронность. Убираем все async
и await
из кода, в частности из цикла:
for attempt in itertools.product(characters, repeat=length):
result = check_password(''.join(attempt))
if result:
return
Результат на моей машине: 4 секунды.
Для других функций проверки результаты могут быть другими, возможно там нужна будет многопоточность или асинхронность, но это сильно зависит от конкретики, просто так использовать асинхронность или многопоточность - это далеко не всегда ускорение кода. А бывает что и (как в коде, приведённом в вопросе) - это наоборот сильное замедление работы кода.
P.S. Первым делом я, конечно, убрал из кода print(f"Пробуем: {attempt}")
, печать сильно замедляет работу кода, она нужна только для отладочных запусков, а если мы боремся за скорость, её нужно убирать.