RuntimeError: Event loop is closed + ValueError: I/O operation on closed pipe - как избавится?

Перепробовал все что нашел в интернете: ставил sleep, менял политику селектора(начинает выдавать другое исключение). Возможно что проблема только на Windows. Еще возможно что-то с сабпроцесами так как исключения появились после добавления асинхрона именно в сабпроцесы.

def post_html(request, log):
    with ThreadPoolExecutor(max_workers=500) as pool:
        log.info(f"Request received")
        connection = psycopg2.connect(host=DATABASES['default']['HOST'], user=DATABASES['default']['USER'],
                                      password=DATABASES['default']['PASSWORD'], database=DATABASES['default']['NAME'])
        connection.autocommit = True
        request = codecs.decode(request).replace('proxy_list=', '').replace('\r', '').strip('\n').split('\n')
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        futures = [loop.run_in_executor(pool, run, i.rstrip('\n').rstrip().split(':'), log, connection)
                   for i in request]
        result = loop.run_until_complete(asyncio.gather(*futures))
        connection.close()
        return result
def run(proxy_attr, log, connection):
    proxy = Proxy(proxy_attr[0], proxy_attr[1], proxy_attr[2], proxy_attr[3])
    log.info(f'Check proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password}')
    id_proxy = add_proxy_to_database(connection, proxy)
    log.info(f'Proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password} add database')
    asyncio.run(proxy.check_dns(log))
async def check_dns(self, log):
    dns = ['https://google.com', 'https://208.67.222.222']
    protocols = ['http', 'socks5']

    futures = [asyncio.create_task(check_dns_no_class(self.user, self.password, self.ip, self.port, url, protocol,
                                                      log)) for url in dns for protocol in protocols]
    data = await wait_first_completed_not_none(futures)
    if data is not None:
        log.debug(f'[{self.ip}:{self.port}:{self.user}:{self.password}][{data["dns"]}][{data["protocol"]}] '
                  f'Response: PROXY CHECKED on dns')
        self.status = 1
        self.protocol = data['protocol']
        self.dns = data['dns']

async def check_dns_no_class(user, password, ip, port, url, protocol, log):
    w = "%{http_code}"
    curl_url = f'curl -x "{protocol}://{user}:{password}@{ip}:{port}" -w {w} {url}'
    log.info(f'[{ip}:{port}:{user}:{password}][{url}] Сurl request: {protocol}')
    process = await asyncio.create_subprocess_shell(curl_url, stdout=asyncio.subprocess.PIPE,
                                                    stderr=asyncio.subprocess.PIPE)
    try:
        stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=3)
        data = codecs.decode(stdout)
        error = re.findall(r"curl:[^\r\n]*", codecs.decode(stderr))
    except asyncio.TimeoutError:
        log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: Time Out')
        return
    code = data
    if code == '000':
        log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: {error}')
        return
    else:
        return {'protocol': protocol, 'dns': url}
async def wait_first_completed_not_none(futures):
    while True:
        # Если получили пустой список,
        # сразу возвращаем None, иначе будет бесконечный цикл
        if not futures:
            return None

        done, pending = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
        # done - список завершившихся тасков
        # pending - список незавершившихся тасков

        # Циклом идем по списку завершившихся тасков, проверяем результат
        for item in done:
            result = item.result()
            if result is not None:
                # Останавливаем незавершившиеся таски
                for pending_task in pending:
                    pending_task.cancel()
                return result

        # Если среди выполненных не было результатов не None, то идем на второй круг
        # Список незавершившихся тасков становится списком тасков, из которых мы будем ожидать результата
        futures = pending

Exception ignored in: <function BaseSubprocessTransport.__del__ at 0x00000235E9C071C0>
Traceback (most recent call last):
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 126, in __del__
    self.close()
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 104, in close
    proto.pipe.close()
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 109, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 753, in call_soon
    self._check_closed()
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000235E9C1CD30>
Traceback (most recent call last):
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 116, in __del__
    _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
    info.append(f'fd={self._sock.fileno()}')
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
    raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe

При смене политики получаем следующее:

Traceback (most recent call last):
  File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\core\handlers\exception.py", line 56, in inner
    response = get_response(request)
  File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\core\handlers\base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\views\decorators\csrf.py", line 55, in wrapped_view
    return view_func(*args, **kwargs)
  File "D:\Pycharm\checker_proxy\checker\views.py", line 29, in index
    result = post_html(request.body, log)
  File "D:\Pycharm\checker_proxy\venv\lib\site-packages\django\views\decorators\csrf.py", line 55, in wrapped_view
    return view_func(*args, **kwargs)
  File "D:\Pycharm\checker_proxy\checker\views.py", line 45, in post_html
    result = loop.run_until_complete(asyncio.gather(*futures))
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 649, in run_until_complete
    return future.result()
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "D:\Pycharm\checker_proxy\checker\views.py", line 120, in run
    asyncio.run(proxy.check_site(log))
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 649, in run_until_complete
    return future.result()
  File "D:\Pycharm\checker_proxy\checker\views.py", line 223, in check_site
    data = await wait_first_completed_not_none(futures)
  File "D:\Pycharm\checker_proxy\checker\views.py", line 167, in wait_first_completed_not_none
    result = item.result()
  File "D:\Pycharm\checker_proxy\checker\views.py", line 98, in check_site_no_class
    process = await asyncio.create_subprocess_shell(curl_url, stdout=asyncio.subprocess.PIPE,
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\subprocess.py", line 205, in create_subprocess_shell
    transport, protocol = await loop.subprocess_shell(
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 1648, in subprocess_shell
    transport = await self._make_subprocess_transport(
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
    raise NotImplementedError
NotImplementedError

Ответы (0 шт):