boost::asio::read_async не читает данные из сокета

Я хочу написать свой легковесный простой брокер сообщений типа Kafka. Написал такой код.

Класс сообщения message.h

#pragma once
 
#include <memory>
 
namespace nano
{
    template<typename T>
    struct message
    {
        T body;
 
        static message<T> DEFAULT;
    };
 
    // Forward declare the connection
    template <typename T>
    class connection;
 
    template<typename T>
    struct owned_message
    {
        std::shared_ptr<connection<T>> remote = nullptr;
        message<T> msg;
 
        static owned_message<T> DEFAULT;
    };
}

В message.cpp определены поля DEFAULT, нужные для возврата сообщения "по умолчанию":

#include "message.h"
 
namespace nano
{
    template<typename T>
    message<T> message<T>::DEFAULT = {};
 
    template<typename T>
    owned_message<T> owned_message<T>::DEFAULT = {};
}

Класс соединения connection.h:

#pragma once
 
#include <iostream>
#include <memory>
#include <string>
#include <boost/asio.hpp>
 
#include "message_queue.h"
#include "message.h"
 
namespace nano
{
    template<typename T>
    class connection : public std::enable_shared_from_this<connection<T>>
    {
    public:
        enum class owner
        {
            SERVER,
            CLIENT
        };
 
        connection(owner parent, boost::asio::io_service& io, boost::asio::ip::tcp::socket socket, message_queue<owned_message<T>>& queue)
            : io(io), socket(std::move(socket)), income(queue)
        {
            owner_type = parent;
        }
 
        void disconnect()
        {
            if (is_connected())
            {
                boost::asio::post(io, [this]() {
                    socket.close();
                });
            }
        }
 
        bool is_connected() const
        {
            return socket.is_open();
        }
 
        void send(const message<T>& msg)
        {
            boost::asio::post(io, [this, msg]() {
                outcome.push(msg);
                send();
            });
        }
 
        void connect()
        {
            if (owner_type == owner::SERVER)
            {
                if (socket.is_open())
                {
                    read();
                }
            }
        }
 
    private:
        void send()
        {
            auto msg = outcome.top();
            boost::asio::async_write(socket, boost::asio::buffer(msg.body, msg.body.size()),
                [this](std::error_code ec, std::size_t length) {
                // TODO
            });
        }
 
        void read()
        {
            boost::asio::streambuf buffer;
            boost::asio::async_read(socket, buffer,
                [this, &buffer](std::error_code ec, std::size_t length) {
                if (!ec)
                {
                    std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl;
                }
            });
        }
 
    private:
        // Контекст Boost ASIO
        boost::asio::io_context& io;
 
        // Каждое соединение имеет уникальный сокет для удаленной части соединения
        boost::asio::ip::tcp::socket socket;
 
        // Ссылка на очередь входящих сообщений родительского объекта
        message_queue<owned_message<T>>& income;
 
        // Очередь хранит сообщения, которые будут отправлены удаленной части соединения
        message_queue<message<T>> outcome;
 
        owner owner_type = owner::SERVER;
    };
}

Интерфейс сервера server_interface.h:

#pragma once
 
#include <iostream>
#include <cstdint>
#include <thread>
#include <memory>
#include <list>
#include <string>
#include <boost/asio.hpp>
 
#include "connection.h"
#include "message_queue.h"
#include "message.h"
 
namespace nano
{
    template<typename T>
    class server_interface
    {
    public:
        server_interface(std::uint16_t port)
            : acceptor(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
        {
 
        }
 
        virtual ~server_interface()
        {
            stop();
        }
 
        bool start()
        {
            try
            {
                listen();
                thread = std::thread([this]() {
                    io.run();
                });
            }
            catch (std::exception& e)
            {
                std::cerr << "[Server] stopped." << std::endl;
                return false;
            }
 
            std::cout << "[Server] started." << std::endl;
            return true;
        }
 
        void stop()
        {
            io.stop();
            if (thread.joinable())
                thread.join();
            std::cout << "[Server] stopped." << std::endl;
        }
 
        void listen()
        {
            acceptor.async_accept([this](std::error_code ec, boost::asio::ip::tcp::socket socket) {
                if (!ec)
                {
                    std::cout << "[Server] new connection: " << socket.remote_endpoint() << std::endl;
 
                    std::shared_ptr<connection<T>> conn =
                        std::make_shared<connection<T>>(connection<T>::owner::SERVER,
                            io, std::move(socket), messages);
 
                    // Даем возможность отклонить соединение
                    if (on_connect(conn))
                    {
                        connections.push_back(std::move(conn));
                        connections.back()->connect();
                    }
                }
 
                listen();
            });
        }
 
        void message_client(std::shared_ptr<connection<T>> client, const message<T>& msg)
        {
            if (client && client->is_connected())
            {
                client->send(msg);
            }
            else
            {
                // Remove dead clients
                connections.erase(std::remove(connections.begin(), connections.end(), nullptr),
                    connections.end());
            }
        }
 
        void update()
        {
            messages.wait();
 
            while (!messages.empty())
            {
                auto msg = messages.top();
                on_message(msg.remote, msg);
                messages.pop();
            }
        }
 
    protected:
        virtual bool on_connect(std::shared_ptr<connection<T>> conn)
        {
            return false;
        }
 
        virtual void on_disconnect(std::shared_ptr<connection<T>> conn)
        {
 
        }
 
        virtual void on_message(std::shared_ptr<connection<T>> conn, const owned_message<T>& msg)
        {
 
        }
 
    protected:
        boost::asio::io_context io;
        boost::asio::ip::tcp::acceptor acceptor;
        std::thread thread;
        std::list<std::shared_ptr<connection<T>>> connections;
        message_queue<owned_message<T>> messages;
    };
}

В очереди сообщений нет ничего особенного:

#pragma once
 
#include <list>
#include <mutex>
#include <memory>
#include <condition_variable>
#include <stdexcept>
 
template<typename Message>
class message_queue
{
public:
    void push(const Message& msg)
    {
        {
            std::unique_lock<std::mutex> lock(mutex);
 
            messages.emplace_front(msg);
        }
        condition.notify_one();
    }
 
    Message& top(int timeout = 0)
    {
        std::unique_lock<std::mutex> lock(mutex);
 
        if (timeout <= 0)
        {
            condition.wait(lock, [this] {
                return !messages.empty();
            });
        }
        else
        {
            auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
                return !messages.empty();
            });
 
            if (timeoutOccured)
                throw std::runtime_error("Timeout");
        }
 
        return messages.front();
    }
 
    void pop()
    {
        std::unique_lock<std::mutex> lock(mutex);
 
        if (!messages.empty())
        {
            messages.pop_front();
        }
    }
 
    bool empty()
    {
        std::unique_lock<std::mutex> lock(mutex);
 
        return messages.empty();
    }
 
    void wait()
    {
        while (messages.empty())
        {
            std::unique_lock<std::mutex> lock(mutex);
            condition.wait(lock);
        }
    }
 
private:
    std::list<Message> messages;
    std::mutex mutex;
    std::condition_variable condition;
};

Реализация сервера получается простая с удобными обработчиками сообщений и соединений:

#include "server_interface.h"
#include "message.h"
 
class message_server : public nano::server_interface<std::string>
{
    using connection_t = std::shared_ptr<nano::connection<std::string>>;
    using message_t = nano::owned_message<std::string>;
 
public:
    message_server(std::uint16_t port)
        : server_interface(port)
    {
 
    }
 
protected:
    bool on_connect(connection_t conn) override
    {
        return true;
    }
 
    void on_disconnect(connection_t conn) override
    {
        std::cout << "Removing client" << std::endl;
    }
 
    void on_message(connection_t conn, const message_t& msg) override
    {
        conn->send(msg.msg);
    }
};
 
int main()
{
    message_server server(60000);
    server.start();
 
    while (true)
    {
        server.update();
    }
}

Проблема в том, что при соединении по телнету с сервером (telnet localhost 60000) сообщения от телнет-клиента не обрабатываются. В коде происходит такой сценарий:

  1. Подключается телнет-клиент

  2. В сервере создается соединение, сохраняется в списке клиентов и сервер начинает читать данные от клиента:

    // Даем возможность отклонить соединение if (on_connect(conn)) { connections.push_back(std::move(conn)); connections.back()->connect(); }

Соединяемся с клиентом:

void connect()
        {
            if (owner_type == owner::SERVER)
            {
                if (socket.is_open())
                {
                    read();
                }
            }
        }

Вот тут возникает проблема:

void read()
        {
            boost::asio::streambuf buffer;
            boost::asio::async_read(socket, buffer,
                [this, &buffer](std::error_code ec, std::size_t length) {
                if (!ec)
                {
                    std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl;
                }
            });
        }

Вызывается метод async_read, но он не получает никакие данные и из него происходит выход. В дальнейшем этот метод больше не вызывается никак. По идее этот метод должен зависнуть и ждать, пока телнет-клиент пришлет данные, но вместо этого происходит выход из этого метода. Почему так?

Как правильно использоватб этот метод, чтобы дождаться данные от телнет-клиента?


Ответы (0 шт):