Как асинхронно проверить прокси в python?
Как асинхронно проверить прокси в python? Я взяла за пример библиотеку proxy-checker и написала код с нужными мне параметрами. Он полностью рабочий, но одна проверка занимает от 10 до 20 секунд и так как метод не асинхронный, во время выполнения на PyQt5 интерфейс подвисает и отвисает на секунду только между каждой проверкой.
Можно ли данный код переписать с асинхронностью или возможно есть другие, уже готовые, асинхронные методы проверки прокси?
Я пробовала использовать:
loop = asyncio.get_event_loop()
res = await loop.run_in_executor(None, ..., ...)
Интерфейс не зависал, но при этом почему-то не возвращались результаты выполнения, к тому же это не является прямым решением.
Вот весь мой код:
import re
import time
import pycurl
import random
from io import BytesIO
proxy_judges = ['http://proxyjudge.us/azenv.php', 'http://mojeip.net.pl/asdfa/azenv.php']
def send_query(proxy=False, url=None, user=None, password=None):
response = BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, url or random.choice(proxy_judges))
c.setopt(c.WRITEDATA, response)
c.setopt(c.TIMEOUT, 5)
if user is not None and password is not None:
c.setopt(c.PROXYUSERPWD, f"{user}:{password}")
c.setopt(c.SSL_VERIFYHOST, 0)
c.setopt(c.SSL_VERIFYPEER, 0)
if proxy:
c.setopt(c.PROXY, proxy)
try:
c.perform()
except:
return False
if c.getinfo(c.HTTP_CODE) != 200:
return False
timeout = round(c.getinfo(c.CONNECT_TIME) * 1000)
response = response.getvalue().decode('iso-8859-1')
return {
'timeout': timeout,
'response': response
}
def get_country(ip):
r = send_query(url='https://ip2c.org/' + ip)
if r and r['response'][0] == '1':
r = r['response'].split(';')
return [r[3], r[1]]
return ['-', '-']
def check_proxy(proxy, check_country=True, check_address=False, user=None, password=None):
timeout, protocols = 0, {}
for protocol in ['http', 'socks4', 'socks5']:
r = send_query(proxy=protocol + '://' + proxy, user=user, password=password)
if not r:
continue
protocols[protocol] = r
timeout += r['timeout']
if len(protocols) == 0:
return False
r = protocols[random.choice(list(protocols.keys()))]['response']
timeout = timeout // len(protocols)
results = {
'protocols': list(protocols.keys()),
'timeout': timeout
}
if check_country:
country = get_country(proxy.split(':')[0])
results['country'] = country[0]
results['country_code'] = country[1]
if check_address:
remote_regex = r'REMOTE_ADDR = (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
remote_addr = re.search(remote_regex, r)
if remote_addr:
remote_addr = remote_addr.group(1)
results['remote_address'] = remote_addr
return results
if __name__ == '__main__':
all_proxies = open('proxies.txt', 'r', encoding='utf-8').read().split('\n')
for proxy in all_proxies:
cur_proxy = proxy.split(':')
ip_port = cur_proxy[0] + ':' + cur_proxy[1]
proxy_log, proxy_pass = cur_proxy[2], cur_proxy[3]
start_time = time.time()
res = check_proxy(ip_port, check_country=True, user=proxy_log, password=proxy_pass)
if res:
protocol = str(res['protocols'][0])
timeout = str(res['timeout'])
country = str(res['country'])
code = str(res['country_code'])
print("Прошло %s секунд" % (round(time.time() - start_time)))
print(f'{proxy}: timeout {timeout} ({protocol}) | {country} ({code})')
Пример вывода:
Прошло 15 секунд
...: timeout 86 (http) | Russian Federation (RU)
Прошло 16 секунд
...: timeout 89 (http) | - (-)
Прошло 12 секунд
...: timeout 87 (http) | Russian Federation (RU)
Прошло 13 секунд
...: timeout 83 (http) | Russian Federation (RU)
Ответы (2 шт):
Вариант 1. Можно воспользоваться стандартным threading из python
import threading
def check_proxies(filename, mode, encoding):
all_proxies = open(filename, mode, encoding).read().split('\n')
for proxy in all_proxies:
cur_proxy = proxy.split(':')
ip_port = cur_proxy[0] + ':' + cur_proxy[1]
proxy_log, proxy_pass = cur_proxy[2], cur_proxy[3]
start_time = time.time()
res = check_proxy(ip_port, check_country=True, user=proxy_log, password=proxy_pass)
if res:
protocol = str(res['protocols'][0])
timeout = str(res['timeout'])
country = str(res['country'])
code = str(res['country_code'])
print("Прошло %s секунд" % (round(time.time() - start_time)))
print(f'{proxy}: timeout {timeout} ({protocol}) | {country} ({code})')
if __name__ == '__main__':
# Создаем объект потока с нужной нам функцией
thread_check_proxies = threading.Thread(target=check_proxies, args=('proxies.txt','r','utf-8'))
# Запускаем поток
thread_check_proxies.start()
Вариант 2. В Qt есть несколько способов работы с потоками, все зависит от того, что именно вам нужно. Например, можно создать класс проверки прокси ("рабочего" - Worker), затем переместить его в поток и запустить соответствующий метод "рабочего" внутри потока:
class CheckProxiesWorker(QtCore.QObject):
# Сигнал с результатом проверки прокси
signal_check_result = QtCore.pyqtSignal(object)
def __init__(self, filename, mode='r', encoding='utf-8'):
super().__init__(self)
self.filename = filename
self.mode = mode
self.encoding = encoding
#
def work(self):
all_proxies = open(self.filename, self.mode, self.encoding).read().split('\n')
for proxy in all_proxies:
cur_proxy = proxy.split(':')
ip_port = cur_proxy[0] + ':' + cur_proxy[1]
proxy_log, proxy_pass = cur_proxy[2], cur_proxy[3]
start_time = time.time()
res = check_proxy(ip_port, check_country=True, user=proxy_log, password=proxy_pass)
if res:
# Эмитируем сигнал с результатом проверки
self.signal_check_result.emit(res)
Где-то в приложении запускаем поток:
# Метод обработки результата проверки
def eval_result(self, res):
# ...
def start_thread_check_proxies:
# создаем поток
self.thread = QtCore.QThread()
# создаем объект рабочего
self.worker = CheckProxiesWorker('proxies.txt', 'r', 'utf-8')
# перенесём объект в поток
self.worker.moveToThread(self.thread)
# соединяем сигнал с методом вывода полученной информации
self.worker.signal_check_result.connect(self.eval_result)
# соединяем сигнал запуска потока с рабочим методом
self.thread.started.connect(self.worker.work)
# запускаем поток
self.thread.start()
Кроме того, можно наследоваться от QThread, тогда все будет немного лаконичнее.
Поскольку в метках вы поставили PyQt5, его и используем. А конкретнее - QThread:
import sys
import time
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtCore import QObject, QThread, pyqtSignal
# ваш код прокси-чекера из вопроса пусть лежит в proxyChecker.py
from proxyChecker import check_proxy
# этот объект будет крутиться в отдельном потоке
class Worker(QObject):
# у объекта будут два сигнала - поток завершен и сигнал с результатом проверки очередного прокси
finished = pyqtSignal()
progress = pyqtSignal(str)
running = False
# это будем использовать если надо будет остановить проверку
def stop(self):
self.running = False
# это основной цикл проверки прокси
def run(self):
self.running = True
all_proxies = open('proxies.txt', 'r', encoding='utf-8').read().split('\n')
for proxy in all_proxies:
self.progress.emit(f'Проверяем: {proxy}')
cur_proxy = proxy.split(':')
ip_port = cur_proxy[0] + ':' + cur_proxy[1]
proxy_log, proxy_pass = cur_proxy[2], cur_proxy[3]
start_time = time.time()
res = check_proxy(ip_port, check_country=True, user=proxy_log, password=proxy_pass)
# отправляем в основной поток инфо р результате тестирования
if res:
protocol = str(res['protocols'][0])
timeout = str(res['timeout'])
country = str(res['country'])
code = str(res['country_code'])
result = f"Прошло {round(time.time() - start_time)} секунд\n"
result += f'{proxy}: timeout {timeout} ({protocol}) | {country} ({code})'
self.progress.emit(result)
else:
result = f"Прошло {round(time.time() - start_time)} секунд\n"
result += f'Недоступен'
self.progress.emit(result)
if not self.running:
break
# говорим основному потоку что мы отработали
self.finished.emit()
class Ui_Form(object):
def setupUi(self, Form):
Form.setObjectName("Form")
Form.resize(453, 408)
self.verticalLayout = QtWidgets.QVBoxLayout(Form)
self.verticalLayout.setObjectName("verticalLayout")
self.verticalLayout_2 = QtWidgets.QVBoxLayout()
self.verticalLayout_2.setObjectName("verticalLayout_2")
self.textBrowser = QtWidgets.QTextBrowser(Form)
self.textBrowser.setObjectName("textBrowser")
self.verticalLayout_2.addWidget(self.textBrowser)
self.verticalLayout.addLayout(self.verticalLayout_2)
self.horizontalLayout = QtWidgets.QHBoxLayout()
self.horizontalLayout.setObjectName("horizontalLayout")
spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.horizontalLayout.addItem(spacerItem)
self.pushButton = QtWidgets.QPushButton(Form)
self.pushButton.setObjectName("pushButton")
self.horizontalLayout.addWidget(self.pushButton)
self.pushButtonCancel = QtWidgets.QPushButton(Form)
self.pushButtonCancel.setObjectName("pushButton2")
self.pushButtonCancel.setEnabled(False)
self.horizontalLayout.addWidget(self.pushButtonCancel)
spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
self.horizontalLayout.addItem(spacerItem1)
self.verticalLayout.addLayout(self.horizontalLayout)
self.retranslateUi(Form)
QtCore.QMetaObject.connectSlotsByName(Form)
def retranslateUi(self, Form):
_translate = QtCore.QCoreApplication.translate
Form.setWindowTitle(_translate("Form", "Example"))
self.pushButton.setText(_translate("Form", "Start"))
self.pushButtonCancel.setText(_translate("Form", "Stop"))
class MyWindow(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__()
self.ui = Ui_Form()
self.ui.setupUi(self)
# цепляем кнопочки к функциям запуска/остановки проверки
self.ui.pushButton.clicked.connect(self.runLongTask)
self.ui.pushButtonCancel.clicked.connect(self.breakLongTask)
# функция будет отрабатывать когда поток проверки пришлет строку с результатом
def reportProgress(self, n):
self.ui.textBrowser.append(n)
def breakLongTask(self):
self.worker.stop()
# при нажатии кнопы запуска надо создать поток проверки
def runLongTask(self):
# сделаем активной кнопку остановки проверки
self.ui.pushButtonCancel.setEnabled(True)
# создадим объект QThread
self.thread = QThread()
# создадим объект, который будет крутиться созданном в потоке
self.worker = Worker()
# запихиваем наш объект в новый поток
self.worker.moveToThread(self.thread)
# делаем обвязку сигналов:
# Функция, вызываемая при запуске потока
self.thread.started.connect(self.worker.run)
# при получении от потока сигнала с результатом, обработаем его в reportProgress
self.worker.progress.connect(self.reportProgress)
# действия при завершении потока:
# а) выйти из потока (завершить поток)
self.worker.finished.connect(self.thread.quit)
# б) запланировать удаление объекта, который крутился в потоке
self.worker.finished.connect(self.worker.deleteLater)
# в) запланировать удаление самого объекта QThread
self.thread.finished.connect(self.thread.deleteLater)
# Наш поток готов к запуску. Стартуем
self.thread.start()
# Пока поток работает, кнопка запуска будет недоступна
self.ui.pushButton.setEnabled(False)
# После завершения потока сделаем кнопку запуска активной а кнопку остановки неактивной
self.thread.finished.connect(
lambda: (
self.ui.textBrowser.append("finished"),
self.ui.pushButtonCancel.setEnabled(False),
self.ui.pushButton.setEnabled(True)
)
)
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
window = MyWindow()
window.show()
sys.exit(app.exec())