Написать многопоточную программу для пакетной обработки команд
Мне нужно написать программу для пакетной обработки комманд, так, чтобы
- команды обрабатывались многопоточно
- ввод данных управлялся внешним кодом.
Многопоточная обработка команд При старте программы должно быть создано в дополнение к существующему основному потоку ещё три дополнительных. Дадим им условные имена: main – основной поток (в пользовательском приложении)
- log – поток для вывода данных в консоль
- file1 – первый поток для вывода в файл
- file2 – второй поток для вывода в файл Основная логика обработки меняется таким образом, что блок команд после своего формирования должен быть отправлен в консоль (потоком log) и сразу в файл (одним из потоков file1 или file2).
При этом отправка блока в файл распределяется между файловыми потоками. Можно напрямую отправлять, например, чётные команды через поток file1, а нечётные – через file2. Но лучшим решение станет использование единой очереди команд, которую будут обрабатывать оба файловых потока одновременно.
Я написал такой код:
Класс для чтения данных с консоли:
#pragma once
#include <vector>
#include <algorithm>
#include <string>
#include <iostream>
#include "command.h"
#include "message_queue.h"
class background_reader
{
public:
background_reader(message_queue& queue)
: queue(queue)
{
}
void operator()()
{
std::string cmd;
while (std::getline(std::cin, cmd))
{
if (!cmd.empty())
{
queue.put(command(std::time(nullptr), cmd));
std::cout << "put: " << cmd << std::endl;
}
}
}
private:
message_queue& queue;
};
Класс для печати команд в консоль:
#pragma once
#include <iostream>
#include <fstream>
#include "message_queue.h"
#include "command.h"
class background_logger
{
public:
background_logger(message_queue& queue)
: queue(queue)
{
}
void operator()()
{
while (true)
{
auto cmd = queue.try_get();
std::cout << cmd->body();
}
}
private:
message_queue& queue;
};
Класс команды:
#pragma once
#include <string>
#include <ctime>
#include <memory>
class command
{
public:
command()
: timestamp(0), cmd(std::string())
{
}
command(std::time_t time, std::string& body)
: timestamp(time), cmd(body)
{
}
std::time_t time() const
{
return timestamp;
}
std::string body() const
{
return cmd;
}
bool isOpenScopeCommand() const
{
return cmd == "{";
}
bool isCloseScopeCommand() const
{
return cmd == "}";
}
std::unique_ptr<command> move()
{
return std::unique_ptr<command>(new command(std::move(*this)));
}
private:
std::time_t timestamp;
std::string cmd;
};
Очередь сообщений:
#pragma once
#include <functional>
#include <queue>
#include <mutex>
#include <memory>
#include <condition_variable>
#include "command.h"
class message_queue
{
public:
message_queue() = default;
~message_queue() = default;
void put(command&& cmd)
{
{
std::unique_lock<std::mutex> lock(mutex);
q.push(cmd.move());
}
condition.notify_one();
}
std::unique_ptr<command> get(int timeout = 0)
{
std::unique_lock<std::mutex> lock(mutex);
if (timeout <= 0)
{
condition.wait(lock, [this] {
return !q.empty();
});
}
else
{
auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
return !q.empty();
});
if (timeoutOccured)
return nullptr;
}
auto cmd = q.front()->move();
q.pop();
return cmd;
}
std::unique_ptr<command> try_get()
{
std::lock_guard<std::mutex> lock(mutex);
if (!q.empty())
{
auto msg = q.front()->move();
q.pop();
return msg;
}
else
{
return { nullptr };
}
}
private:
std::queue<std::unique_ptr<command>> q;
std::mutex mutex;
std::condition_variable condition;
};
Функция main:
#include <iostream>
#include <thread>
#include "message_queue.h"
#include "console_reader.h"
#include "console_logger.h"
int main()
{
message_queue queue;
background_reader reader(queue);
background_logger logger(queue);
auto reader_thread = std::thread(reader);
auto logger_thread = std::thread(logger);
if (reader_thread.joinable())
reader_thread.join();
if (logger_thread.joinable())
logger_thread.join();
return 0;
}
Почему эта программа либо ничего не печатает в консоль, либо крашится с сегфолтом? Программа крашится на этой строчке:
std::string cmd; while (std::getline(std::cin, cmd))
Как придумать архитектуру этой программы, чтобы она была легко расширяемой и легкой для понимания? Например, для того, чтобы можно было сделать эту программу сетевой.
Как сделать управление этой программой при помощи внешнего кода?