Доступ к FTP ФНС через python
Мне необходимо написать небольшой сервис на python, который будет раз в сутки подключаться к FTP серверу ФНС и оттуда забирать xml с данными, парсить их и класть данные в бд. Но на моменте подключения к FTP серверу подключение отлетает по таймауту. Из того что я имею это файл с сертификатами *.p12, пароль к нему и адрес самого фтп.
Вот так нарезаю файл:
from cryptography.hazmat.primitives.serialization import Encoding, pkcs12, PrivateFormat, NoEncryption
def p12_to_pem(pfx_path, pfx_password):
""" Decrypts the .p12 file to be used with requests. """
with open('all.pem', 'w') as t_pem:
f_pem = open(t_pem.name, 'wb')
pfx = open(pfx_path, 'rb').read()
p12 = pkcs12.load_pkcs12(pfx, pfx_password)
f_pem.write(p12.key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
f_pem.write(p12.cert.certificate.public_bytes(Encoding.PEM))
a_d = p12.additional_certs
if a_d is not None:
for cert in a_d:
f_pem.write(cert.certificate.public_bytes(Encoding.PEM))
f_pem.close()
return t_pem.name
Затем работаю с этим файлом вот так:
from ftplib import FTP_TLS
import ssl
import socket
HOSTNAME = 'ftp.egrul.nalog.ru'
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(p12_to_pem(<*.p12>, <pass>))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock:
with context.wrap_socket(sock, server_hostname=HOSTNAME) as ssock:
with FTP_TLS(context=ssock.context) as ftps:
ftps.connect(HOSTNAME)
ftps.login()
ftps.prot_p()
ftps.nlst()
Можете подсказать пожалуйста что я делаю не так? Или может кто то уже решал такую задачу по подключению к FTP ФНС?
Ответы (1 шт):
Автор решения: Артемис
→ Ссылка
В итоге ларчик открывался просто. Не совсем то что я думал но вот так все работает:
from requests import Session
from requests_pkcs12 import Pkcs12Adapter
HOSTNAME = '<host>'
CERT = '<*.p12>'
with Session() as s:
s.mount('https://ftp.egrul.nalog.ru/', Pkcs12Adapter(pkcs12_filename=CERT, pkcs12_password='<pass>'))
r = s.get(HOSTNAME)
with open("test.zip", "wb") as zip_f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
zip_f.write(chunk)