Как передать данные по сокету Python

Всём привет, столкнулся с проблемой передачи данных по совету Python.

Как должно работать: Клиент->сервер->Клиент 2

Клиент первый отправляет данные серверу, Сервер обрабатывает отправляет данные Клиенту 2. Всё это происходит в разных потоках

На данный момент работает так: Клиент-сервер-клиент Клиент 2 - сервер- клиент 2

Возможно я с потоками не правильно дружу(

Помогите кто знает


Ответы (1 шт):

Автор решения: Universall

Работа с потоками происходит примерно так: Пока 1й поток ждёт новой информации от клиента, 2й - работает (Одновременно выполнять код они не могут из-за GIL). Выглядит это примерно так:

    def _run(self) -> None:
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((HOST, PORT))
        self.server_socket.listen(8)

        while True:  # В бесконечном цикле ждём новых клиентов
            client, address = self.server_socket.accept()  # Принимаем подключение клиента
            thread = threading.Thread(target=self.handle_client, args=(client, address), daemon=True)  # создаём отдельный поток для него
            thread.start()  # Запускаем поток

    def handle_client(self, client_socket: socket.socket, address: tuple) -> None:
        while True:  # В бесконечном цикле ждём сообщений и обрабатываем их как нам нужно
            data = client_socket.recv(1024)
            data = data.decode(encoding=ENCODING)
            if not data:
                break

А как связать 2 клиента через сервер уже дело фантазии. Как пример: через создание пользователей и поиску по id:

import json
import socket
from dataclasses import dataclass
import threading


HOST = 'localhost'
PORT = 8088
ENCODING = 'utf-8'


# можно вынести в отдельный файл
@dataclass
class Client:  # Объект пользователя
    id: int
    client_socket: socket.socket
    name: str


# можно вынести в отдельный файл
def register(client_socket: socket.socket, data: dict, server: 'Server') -> dict:
    client = server.create_user(client_socket, data.get("name", "user"))  # Создаём пользователя и даём ему id
    return {
        "name": client.name,
        "id": client.id
    }


# можно вынести в отдельный файл
def send_message(client_socket, data: dict, server: 'Server') -> dict:
    client_1 = server.connected_clients.get(client_socket)

    if not client_1:  # Проверяем наличие всей нужной нам информации для отправки сообщения (Кто отправил, кому отправил, что отправил)
        return {"error": "user not found"}
    if not (client_2_id := data.get('client_2_id')):
        return {"error": "client_2_id not found"}
    if not (message := data.get("message")):
        return {"error": "message not found"}

    try:
        client_2 = list(filter(lambda x: x.id == int(client_2_id), server.connected_clients.values()))[0]  # Ищем 2го пользователя по id
    except IndexError:
        return {"error": "User to send not found"}

    server.send(client_2.client_socket, {  # Если пользователь найден - отправляем ему сообщение
        "function": "new_message",
        "data": {
            "message": message,
            "sender": {
                "id": client_1.id,
                "name": client_1.name
            },
        }
    })
    return {"success": "ok"}


class Server:
    last_client_id = 0
    functions = {  # Функции, которые может вызвать пользователь
        "register": register,
        "send_message": send_message
    }

    def __init__(self):
        self.connected_clients = {}
        self._run()

    # можно вынести в отдельный файл
    def create_user(self, client_socket: socket.socket, name: str) -> Client:
        client = Client(self.last_client_id, client_socket, name)
        self.connected_clients[client_socket] = client
        self.last_client_id += 1
        return client

    # можно вынести в отдельный файл
    def delete_user(self, client_socket: socket.socket) -> None:
        if self.connected_clients.get(client_socket):
            del self.connected_clients[client_socket]

    # можно вынести в отдельный файл
    def send(self, client_socket: socket.socket, data: dict) -> None:
        client_socket.send(json.dumps(data, indent=4).encode(ENCODING))  # Подготовка данных и их отправка клиенту

    def _run(self) -> None:
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((HOST, PORT))
        self.server_socket.listen(8)

        while True:
            client, address = self.server_socket.accept()  # Принимаем подключение клиента
            thread = threading.Thread(target=self.handle_client, args=(client, address), daemon=True)  # создаём отдельный поток для него
            thread.start()  # Запускаем поток

    def handle_client(self, client_socket: socket.socket, address: tuple) -> None:
        while True:  # В бесконечном цикле ждём сообщений и обрабатываем их как нам нужно
            data = client_socket.recv(1024)
            data = data.decode(encoding=ENCODING)

            try:
                data = json.loads(data)
            except json.JSONDecodeError:
                self.send(client_socket, {"error": "json decode error"})
                return

            if not data:
                self.delete_user(client_socket)  # Пользователь отключился
                break

            if func := self.functions.get(data.get("function")):  # Проверяем есть ли нужная пользователю функция в списке доступных функций сервера
                response = func(client_socket, data.get("data", {}), self)  # Если есть, то вызываем её и отдаём результат
                self.send(client_socket, response)
            else:
                self.send(client_socket, {"error": "function not found"})


if __name__ == '__main__':
    app = Server()

P.S: Код достаточно кустарный и недоделанный, но алгоритм, думаю, понятен.

Или можно воспользоваться не потоками, а асинхронностью, что, на мой взгляд, лучше (т.к потоки в больших количествах могут есть много памяти):

import json
import socket
import asyncio
from dataclasses import dataclass


HOST = 'localhost'
PORT = 8088
ENCODING = 'utf-8'


@dataclass
class Client:
    id: int
    client_socket: socket.socket
    name: str


async def register(client_socket: socket.socket, data: dict, server: 'Server') -> dict:
    client = server.create_user(client_socket, data.get("name", "user"))
    return {
        "name": client.name,
        "id": client.id
    }


async def send_message(client_socket, data: dict, server: 'Server') -> dict:
    client_1 = server.connected_clients.get(client_socket)

    if not client_1:
        return {"error": "user not found"}
    if not (client_2_id := data.get('client_2_id')):
        return {"error": "client_2_id not found"}
    if not (message := data.get("message")):
        return {"error": "message not found"}

    try:
        client_2 = list(filter(lambda x: x.id == int(client_2_id), server.connected_clients.values()))[0]
    except IndexError:
        return {"error": "User to send not found"}

    server.send(client_2.client_socket, {
        "function": "new_message",
        "data": {
            "message": message,
            "sender": {
                "id": client_1.id,
                "name": client_1.name
            },
        }
    })
    return {"success": "ok"}


class Server:
    last_client_id = 0
    functions = {
        "register": register,
        "send_message": send_message
    }

    def __init__(self):
        loop = asyncio.get_event_loop()
        self.connected_clients = {}
        loop.run_until_complete(self._run())  # Запускаем сервер

    def create_user(self, client_socket: socket.socket, name: str) -> Client:
        client = Client(self.last_client_id, client_socket, name)
        self.connected_clients[client_socket] = client
        self.last_client_id += 1
        return client

    def delete_user(self, client_socket: socket.socket) -> None:
        if self.connected_clients.get(client_socket):
            del self.connected_clients[client_socket]

    def send(self, client_socket: socket.socket, data: dict) -> None:
        client_socket.send(json.dumps(data, indent=4).encode(ENCODING))

    async def _run(self) -> None:
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((HOST, PORT))
        self.server_socket.listen(8)
        self.server_socket.setblocking(False)  # Убираем блокировку

        loop = asyncio.get_event_loop()

        while True:
            client, address = await loop.sock_accept(self.server_socket)  # Асинхронно ждём пользователей
            loop.create_task(self.handle_client(client, address))   # Создаём задачу под пользователя

    async def handle_client(self, client_socket: socket.socket, address: tuple) -> None:
        loop = asyncio.get_event_loop()
        while True:
            data = await loop.sock_recv(client_socket, 1024)  # Асинхронно ждём данные
            data = data.decode(encoding=ENCODING)

            try:
                data = json.loads(data)
            except json.JSONDecodeError:
                self.send(client_socket, {"error": "json decode error"})
                return

            if not data:
                self.delete_user(client_socket)
                break

            if func := self.functions.get(data.get("function")):
                response = await func(client_socket, data.get("data", {}), self)
                self.send(client_socket, response)
            else:
                self.send(client_socket, {"error": "function not found"})

Клиент:

import json
import socket
import threading

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8088))


class Listener(threading.Thread):  # Отдельный поток для поиска и вывода новых сообщений
    def run(self) -> None:
        while True:
            from_server = client.recv(1024)
            print(from_server)


def send(request: dict) -> None:
    client.send(json.dumps(request).encode(encoding='utf-8'))


listener = Listener(daemon=True)
listener.start()

send({  # Регистрация
    "function": "register",
    "data": {
        "name": input("Name: ")
    }
})

while True:  # Отправка
    message = input("message: ")

    send({
        "function": "send_message",
        "data": {
            "client_2_id": input("to: "),
            "message": message
        }
    })

→ Ссылка