Python. Отправка файла по TCP сокету

Если вкратце, то вопрос стоит так: "Как быстрее всего отправить файл по TCP сокету ?"

А если поподробнее, изначально я пришел к выводу что отправлять в одном потоке это долго, поэтому решил поискать в интернете готовые решения по многопоточной отправке файла, но толковой информации не нашлось, поэтому я принялся применять знания на практике, грубо говоря.

Вкратце мой вариант программы делает следующее.

Для начала он получает необходимую для работы информацию о файле.

Например вычисляет, так называемую, контрольную сумму, узнает его размер, узнает количество блоков которые необходимо будет отправить.

Так как отправлять файл нужно будет в многопоточном режиме, то файл нужно поделить на блоки, я думаю это понятно, размером блока я выбрал максимальное (почти) количество байт которые можно отправить за один вызов sock.send(). Почти, потому что в начало блока необходимо добавить его порядковый номер в файле.

Скажем если это будет третий по счету блок размером 65.000 байт, то порядковый номер будет 3, а сам блок данных будет выглядеть так:

file_block = pack(">Q", num)+f.read(65000)

Где num - это порядковый номер.

Порядковый номер нужен для того, чтобы принимающая сторона понимала в какую часть файла нужно записывать блок.

Скажем если это опять же блок 3, размером 65000 байт, то запись такого блока в файл будет выглядеть следующим образом:

data = sock.recv(65008)
num = unpack(">Q", data[0:8])[0]
f.seek(num*65000)
f.write(data[8:]

Вызывая recv() мы ожидаем 65008 байт потому что размер long long int - 8.

Далее чтобы разместить кусок данных в нужном месте мы перемещаем указатель файла вызывая f.seek(), ну и собственно записываем данные.

Как вы уже могли догадаться, порой случается дробление пакетов, и принимающая сторона не может распаковать порядковый номер, потому что она получила не целый пакет, а часть другого пакета.

Когда пакет получается распаковать, я записываю его порядковый номер, и после получения всех пакетов, я проверяю какие не были получены верно. Нужно это для того чтобы запросить их отправку снова.

Собственно вопросы у меня следующие.

Нормально ли создавать такое количество потоков (при отправке файла размером 4гб их будет создано около 70к), быстрее ли это чем скажем отправка большего количества данных но с использованием потоков количество которых равно количеству ядер процессора, ну и в целом ваше мнение касаемо реализации, предложения как сделать лучше приветствуются.

Если для понимания того что я понаписал нужен весь код, то вот он

from os import cpu_count, path
import socket
from datetime import datetime
from threading import Thread
from hashlib import sha256
from sys import argv
from struct import pack, unpack

#
# SHA256 hash of example file "test.dat"
#
# 50131c5965bd8576ef001d5f680576145159d9300eda2f03513c1f950c36ffa2
# OR
# sha256(open("test.dat", "rb").read()).hexdigest()
#
# Size of "test.dat"
# 4.687.587.984 bytes
#
# Size of BLOCK
# 65.000 bytes
#
# Count of blocks is not equally divided by the file size.
#
# 4687587984/65000=72.116,7382
#
# So the count of blocks will be (Size of file/Size of BLOCK)+1
#
# Block count
# 72117
#
# Block count is equal to the count of running threads, 1 thread for 1 block.
#

def get_file_chunk(sock, f, err, good):

    try:
        data = sock.recv(65008)
        #print(len(data))
        #print(data[0:15])
        num = unpack(">Q", data[0:8])[0]
        #print(num)
        f.seek(num*65000)
        f.write(data[8:])
        f.seek(0)
        good.append(num)
    except Exception as error:
        print("Bad block, %s" % (data[0:15]))
        err.append(1)

def writin_file():

    bad = []
    good = []
    threads = []
    bad_block = []

    sock = socket.socket()
    sock.connect(("192.168.1.210", 9090))

    print("[+] Connected")
    f = open("test.fin", "wb")
    
    for x in range(0, 72117):
        th = Thread(target=get_file_chunk, args=(sock, f, bad, good))
        threads.append(th)
    print("[+] Waiting...")
    for th in threads:
        th.start()
    for th in threads:
        th.join()

    for x in range(0, 72117):
        if x not in good:
            bad_block.append(x)
    print("Bad block len %i" % len(bad_block))
    f.close()
    #print("hash: %s" % (sha256(open("test.fin", "rb").read()).hexdigest()))

def get_block_count(fsize, bsize):

    res = fsize/bsize
    if res % 1 != 0:
        res = int(res+1)
    return res

def send_file_chunk(sock, chunk):

    sock.send(chunk)

def send_file(sock):

    fname = "test.dat"
    fsize = path.getsize(fname)
    bsize = 65000
    bcount = get_block_count(fsize, bsize)
    hash = "50131c5965bd8576ef001d5f680576145159d9300eda2f03513c1f950c36ffa2"

    print(fname, fsize, bsize, bcount, hash)

    range_bcount = bcount+1
    f = open(fname, "rb")

    for x in range(0, range_bcount):

        f.seek(x*bsize)
        data = f.read(65000)
        chunk = pack(">Q", x) + data
        #print(len(chunk))
        Thread(target=send_file_chunk, args=(sock, chunk)).start()
        f.seek(0)
        #print("[+] Data:%s, unpack chunk number:%i" % (msg[0:15], unpack(">Q", msg[0:8])[0]))

    print("[+] Sent.")
    f.close()

def wait_server():

    s = socket.socket()
    s.bind(("0.0.0.0", 9090))
    s.listen()
    sock, addr = s.accept()
    print("[+] Connected to %s" % addr[0])

    return sock

Ответы (0 шт):