RuntimeError: Event loop is closed + ValueError: I/O operation on closed pipe - как избавится?
Перепробовал все что нашел в интернете: ставил sleep, менял политику селектора(начинает выдавать другое исключение). Возможно что проблема только на Windows. Еще возможно что-то с сабпроцесами так как исключения появились после добавления асинхрона именно в сабпроцесы.
def post_html(request, log):
with ThreadPoolExecutor(max_workers=500) as pool:
log.info(f"Request received")
connection = psycopg2.connect(host=DATABASES['default']['HOST'], user=DATABASES['default']['USER'],
password=DATABASES['default']['PASSWORD'], database=DATABASES['default']['NAME'])
connection.autocommit = True
request = codecs.decode(request).replace('proxy_list=', '').replace('\r', '').strip('\n').split('\n')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
futures = [loop.run_in_executor(pool, run, i.rstrip('\n').rstrip().split(':'), log, connection)
for i in request]
result = loop.run_until_complete(asyncio.gather(*futures))
connection.close()
return result
def run(proxy_attr, log, connection):
proxy = Proxy(proxy_attr[0], proxy_attr[1], proxy_attr[2], proxy_attr[3])
log.info(f'Check proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password}')
id_proxy = add_proxy_to_database(connection, proxy)
log.info(f'Proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password} add database')
asyncio.run(proxy.check_dns(log))
async def check_dns(self, log):
dns = ['https://google.com', 'https://208.67.222.222']
protocols = ['http', 'socks5']
futures = [asyncio.create_task(check_dns_no_class(self.user, self.password, self.ip, self.port, url, protocol,
log)) for url in dns for protocol in protocols]
data = await wait_first_completed_not_none(futures)
if data is not None:
log.debug(f'[{self.ip}:{self.port}:{self.user}:{self.password}][{data["dns"]}][{data["protocol"]}] '
f'Response: PROXY CHECKED on dns')
self.status = 1
self.protocol = data['protocol']
self.dns = data['dns']
async def check_dns_no_class(user, password, ip, port, url, protocol, log):
w = "%{http_code}"
curl_url = f'curl -x "{protocol}://{user}:{password}@{ip}:{port}" -w {w} {url}'
log.info(f'[{ip}:{port}:{user}:{password}][{url}] Сurl request: {protocol}')
process = await asyncio.create_subprocess_shell(curl_url, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=3)
data = codecs.decode(stdout)
error = re.findall(r"curl:[^\r\n]*", codecs.decode(stderr))
except asyncio.TimeoutError:
log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: Time Out')
return
code = data
if code == '000':
log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: {error}')
return
else:
return {'protocol': protocol, 'dns': url}
async def wait_first_completed_not_none(futures):
while True:
# Если получили пустой список,
# сразу возвращаем None, иначе будет бесконечный цикл
if not futures:
return None
done, pending = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
# done - список завершившихся тасков
# pending - список незавершившихся тасков
# Циклом идем по списку завершившихся тасков, проверяем результат
for item in done:
result = item.result()
if result is not None:
# Останавливаем незавершившиеся таски
for pending_task in pending:
pending_task.cancel()
return result
# Если среди выполненных не было результатов не None, то идем на второй круг
# Список незавершившихся тасков становится списком тасков, из которых мы будем ожидать результата
futures = pending
Exception ignored in: <function BaseSubprocessTransport.__del__ at 0x00000235E9C071C0>
Traceback (most recent call last):
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 126, in __del__
self.close()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 104, in close
proto.pipe.close()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 109, in close
self._loop.call_soon(self._call_connection_lost, None)
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 753, in call_soon
self._check_closed()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 515, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000235E9C1CD30>
Traceback (most recent call last):
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 116, in __del__
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
info.append(f'fd={self._sock.fileno()}')
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe
При смене политики получаем следующее:
Traceback (most recent call last):
File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\core\handlers\exception.py", line 56, in inner
response = get_response(request)
File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\core\handlers\base.py", line 197, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\views\decorators\csrf.py", line 55, in wrapped_view
return view_func(*args, **kwargs)
File "D:\Pycharm\checker_proxy\checker\views.py", line 29, in index
result = post_html(request.body, log)
File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\views\decorators\csrf.py", line 55, in wrapped_view
return view_func(*args, **kwargs)
File "D:\Pycharm\checker_proxy\checker\views.py", line 45, in post_html
result = loop.run_until_complete(asyncio.gather(*futures))
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 649, in run_until_complete
return future.result()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "D:\Pycharm\checker_proxy\checker\views.py", line 120, in run
asyncio.run(proxy.check_site(log))
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 649, in run_until_complete
return future.result()
File "D:\Pycharm\checker_proxy\checker\views.py", line 223, in check_site
data = await wait_first_completed_not_none(futures)
File "D:\Pycharm\checker_proxy\checker\views.py", line 167, in wait_first_completed_not_none
result = item.result()
File "D:\Pycharm\checker_proxy\checker\views.py", line 98, in check_site_no_class
process = await asyncio.create_subprocess_shell(curl_url, stdout=asyncio.subprocess.PIPE,
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\subprocess.py", line 205, in create_subprocess_shell
transport, protocol = await loop.subprocess_shell(
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 1648, in subprocess_shell
transport = await self._make_subprocess_transport(
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
raise NotImplementedError
NotImplementedError