boost::asio::read_async не читает данные из сокета
Я хочу написать свой легковесный простой брокер сообщений типа Kafka. Написал такой код.
Класс сообщения message.h
#pragma once
#include <memory>
namespace nano
{
template<typename T>
struct message
{
T body;
static message<T> DEFAULT;
};
// Forward declare the connection
template <typename T>
class connection;
template<typename T>
struct owned_message
{
std::shared_ptr<connection<T>> remote = nullptr;
message<T> msg;
static owned_message<T> DEFAULT;
};
}
В message.cpp определены поля DEFAULT, нужные для возврата сообщения "по умолчанию":
#include "message.h"
namespace nano
{
template<typename T>
message<T> message<T>::DEFAULT = {};
template<typename T>
owned_message<T> owned_message<T>::DEFAULT = {};
}
Класс соединения connection.h:
#pragma once
#include <iostream>
#include <memory>
#include <string>
#include <boost/asio.hpp>
#include "message_queue.h"
#include "message.h"
namespace nano
{
template<typename T>
class connection : public std::enable_shared_from_this<connection<T>>
{
public:
enum class owner
{
SERVER,
CLIENT
};
connection(owner parent, boost::asio::io_service& io, boost::asio::ip::tcp::socket socket, message_queue<owned_message<T>>& queue)
: io(io), socket(std::move(socket)), income(queue)
{
owner_type = parent;
}
void disconnect()
{
if (is_connected())
{
boost::asio::post(io, [this]() {
socket.close();
});
}
}
bool is_connected() const
{
return socket.is_open();
}
void send(const message<T>& msg)
{
boost::asio::post(io, [this, msg]() {
outcome.push(msg);
send();
});
}
void connect()
{
if (owner_type == owner::SERVER)
{
if (socket.is_open())
{
read();
}
}
}
private:
void send()
{
auto msg = outcome.top();
boost::asio::async_write(socket, boost::asio::buffer(msg.body, msg.body.size()),
[this](std::error_code ec, std::size_t length) {
// TODO
});
}
void read()
{
boost::asio::streambuf buffer;
boost::asio::async_read(socket, buffer,
[this, &buffer](std::error_code ec, std::size_t length) {
if (!ec)
{
std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl;
}
});
}
private:
// Контекст Boost ASIO
boost::asio::io_context& io;
// Каждое соединение имеет уникальный сокет для удаленной части соединения
boost::asio::ip::tcp::socket socket;
// Ссылка на очередь входящих сообщений родительского объекта
message_queue<owned_message<T>>& income;
// Очередь хранит сообщения, которые будут отправлены удаленной части соединения
message_queue<message<T>> outcome;
owner owner_type = owner::SERVER;
};
}
Интерфейс сервера server_interface.h:
#pragma once
#include <iostream>
#include <cstdint>
#include <thread>
#include <memory>
#include <list>
#include <string>
#include <boost/asio.hpp>
#include "connection.h"
#include "message_queue.h"
#include "message.h"
namespace nano
{
template<typename T>
class server_interface
{
public:
server_interface(std::uint16_t port)
: acceptor(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
}
virtual ~server_interface()
{
stop();
}
bool start()
{
try
{
listen();
thread = std::thread([this]() {
io.run();
});
}
catch (std::exception& e)
{
std::cerr << "[Server] stopped." << std::endl;
return false;
}
std::cout << "[Server] started." << std::endl;
return true;
}
void stop()
{
io.stop();
if (thread.joinable())
thread.join();
std::cout << "[Server] stopped." << std::endl;
}
void listen()
{
acceptor.async_accept([this](std::error_code ec, boost::asio::ip::tcp::socket socket) {
if (!ec)
{
std::cout << "[Server] new connection: " << socket.remote_endpoint() << std::endl;
std::shared_ptr<connection<T>> conn =
std::make_shared<connection<T>>(connection<T>::owner::SERVER,
io, std::move(socket), messages);
// Даем возможность отклонить соединение
if (on_connect(conn))
{
connections.push_back(std::move(conn));
connections.back()->connect();
}
}
listen();
});
}
void message_client(std::shared_ptr<connection<T>> client, const message<T>& msg)
{
if (client && client->is_connected())
{
client->send(msg);
}
else
{
// Remove dead clients
connections.erase(std::remove(connections.begin(), connections.end(), nullptr),
connections.end());
}
}
void update()
{
messages.wait();
while (!messages.empty())
{
auto msg = messages.top();
on_message(msg.remote, msg);
messages.pop();
}
}
protected:
virtual bool on_connect(std::shared_ptr<connection<T>> conn)
{
return false;
}
virtual void on_disconnect(std::shared_ptr<connection<T>> conn)
{
}
virtual void on_message(std::shared_ptr<connection<T>> conn, const owned_message<T>& msg)
{
}
protected:
boost::asio::io_context io;
boost::asio::ip::tcp::acceptor acceptor;
std::thread thread;
std::list<std::shared_ptr<connection<T>>> connections;
message_queue<owned_message<T>> messages;
};
}
В очереди сообщений нет ничего особенного:
#pragma once
#include <list>
#include <mutex>
#include <memory>
#include <condition_variable>
#include <stdexcept>
template<typename Message>
class message_queue
{
public:
void push(const Message& msg)
{
{
std::unique_lock<std::mutex> lock(mutex);
messages.emplace_front(msg);
}
condition.notify_one();
}
Message& top(int timeout = 0)
{
std::unique_lock<std::mutex> lock(mutex);
if (timeout <= 0)
{
condition.wait(lock, [this] {
return !messages.empty();
});
}
else
{
auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
return !messages.empty();
});
if (timeoutOccured)
throw std::runtime_error("Timeout");
}
return messages.front();
}
void pop()
{
std::unique_lock<std::mutex> lock(mutex);
if (!messages.empty())
{
messages.pop_front();
}
}
bool empty()
{
std::unique_lock<std::mutex> lock(mutex);
return messages.empty();
}
void wait()
{
while (messages.empty())
{
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
}
}
private:
std::list<Message> messages;
std::mutex mutex;
std::condition_variable condition;
};
Реализация сервера получается простая с удобными обработчиками сообщений и соединений:
#include "server_interface.h"
#include "message.h"
class message_server : public nano::server_interface<std::string>
{
using connection_t = std::shared_ptr<nano::connection<std::string>>;
using message_t = nano::owned_message<std::string>;
public:
message_server(std::uint16_t port)
: server_interface(port)
{
}
protected:
bool on_connect(connection_t conn) override
{
return true;
}
void on_disconnect(connection_t conn) override
{
std::cout << "Removing client" << std::endl;
}
void on_message(connection_t conn, const message_t& msg) override
{
conn->send(msg.msg);
}
};
int main()
{
message_server server(60000);
server.start();
while (true)
{
server.update();
}
}
Проблема в том, что при соединении по телнету с сервером (telnet localhost 60000) сообщения от телнет-клиента не обрабатываются. В коде происходит такой сценарий:
Подключается телнет-клиент
В сервере создается соединение, сохраняется в списке клиентов и сервер начинает читать данные от клиента:
// Даем возможность отклонить соединение if (on_connect(conn)) { connections.push_back(std::move(conn)); connections.back()->connect(); }
Соединяемся с клиентом:
void connect()
{
if (owner_type == owner::SERVER)
{
if (socket.is_open())
{
read();
}
}
}
Вот тут возникает проблема:
void read()
{
boost::asio::streambuf buffer;
boost::asio::async_read(socket, buffer,
[this, &buffer](std::error_code ec, std::size_t length) {
if (!ec)
{
std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl;
}
});
}
Вызывается метод async_read, но он не получает никакие данные и из него происходит выход. В дальнейшем этот метод больше не вызывается никак. По идее этот метод должен зависнуть и ждать, пока телнет-клиент пришлет данные, но вместо этого происходит выход из этого метода. Почему так?
Как правильно использоватб этот метод, чтобы дождаться данные от телнет-клиента?