Написать многопоточную программу для пакетной обработки команд

Мне нужно написать программу для пакетной обработки комманд, так, чтобы

  1. команды обрабатывались многопоточно
  2. ввод данных управлялся внешним кодом.

Многопоточная обработка команд При старте программы должно быть создано в дополнение к существующему основному потоку ещё три дополнительных. Дадим им условные имена: main – основной поток (в пользовательском приложении)

  1. log – поток для вывода данных в консоль
  2. file1 – первый поток для вывода в файл
  3. file2 – второй поток для вывода в файл Основная логика обработки меняется таким образом, что блок команд после своего формирования должен быть отправлен в консоль (потоком log) и сразу в файл (одним из потоков file1 или file2).

При этом отправка блока в файл распределяется между файловыми потоками. Можно напрямую отправлять, например, чётные команды через поток file1, а нечётные – через file2. Но лучшим решение станет использование единой очереди команд, которую будут обрабатывать оба файловых потока одновременно.

Я написал такой код:

Класс для чтения данных с консоли:

#pragma once

#include <vector>
#include <algorithm>
#include <string>
#include <iostream>

#include "command.h"
#include "message_queue.h"

class background_reader
{
public:
    background_reader(message_queue& queue)
        : queue(queue)
    {

    }

    void operator()()
    {
        std::string cmd;
        while (std::getline(std::cin, cmd))
        {
            if (!cmd.empty())
            {
                queue.put(command(std::time(nullptr), cmd));
                std::cout << "put: " << cmd << std::endl;
            }
        }
    }

private:
    message_queue& queue;
};

Класс для печати команд в консоль:

#pragma once

#include <iostream>
#include <fstream>

#include "message_queue.h"
#include "command.h"

class background_logger
{
public:
    background_logger(message_queue& queue)
        : queue(queue)
    {

    }

    void operator()()
    {
        while (true)
        {
            auto cmd = queue.try_get();
            std::cout << cmd->body();
        }
    }

private:
    message_queue& queue;
};

Класс команды:

#pragma once

#include <string>
#include <ctime>
#include <memory>

class command
{
public:
    command()
        : timestamp(0), cmd(std::string())
    {

    }

    command(std::time_t time, std::string& body)
        : timestamp(time), cmd(body)
    {

    }

    std::time_t time() const
    {
        return timestamp;
    }

    std::string body() const
    {
        return cmd;
    }

    bool isOpenScopeCommand() const
    {
        return cmd == "{";
    }

    bool isCloseScopeCommand() const
    {
        return cmd == "}";
    }

    std::unique_ptr<command> move()
    {
        return std::unique_ptr<command>(new command(std::move(*this)));
    }

private:
    std::time_t timestamp;
    std::string cmd;
};

Очередь сообщений:

#pragma once

#include <functional>
#include <queue>
#include <mutex>
#include <memory>
#include <condition_variable>

#include "command.h"

class message_queue
{
public:
    message_queue() = default;

    ~message_queue() = default;

    void put(command&& cmd)
    {
        {
            std::unique_lock<std::mutex> lock(mutex);
            q.push(cmd.move());
        }

        condition.notify_one();
    }
    std::unique_ptr<command> get(int timeout = 0)
    {
        std::unique_lock<std::mutex> lock(mutex);

        if (timeout <= 0)
        {
            condition.wait(lock, [this] {
                return !q.empty();
            });
        }
        else
        {
            auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
                return !q.empty();
            });

            if (timeoutOccured)
                return nullptr;
        }

        auto cmd = q.front()->move();
        q.pop();
        return cmd;
    }

    std::unique_ptr<command> try_get()
    {
        std::lock_guard<std::mutex> lock(mutex);

        if (!q.empty())
        {
            auto msg = q.front()->move();
            q.pop();
            return msg;
        }
        else
        {
            return { nullptr };
        }
    }

private:
    std::queue<std::unique_ptr<command>> q;
    std::mutex mutex;
    std::condition_variable condition;
};

Функция main:

#include <iostream>
#include <thread>

#include "message_queue.h"
#include "console_reader.h"
#include "console_logger.h"

int main()
{
    message_queue queue;
    background_reader reader(queue);
    background_logger logger(queue);

    auto reader_thread = std::thread(reader);
    auto logger_thread = std::thread(logger);

    if (reader_thread.joinable())
        reader_thread.join();

    if (logger_thread.joinable())
        logger_thread.join();
    return 0;
}
  1. Почему эта программа либо ничего не печатает в консоль, либо крашится с сегфолтом? Программа крашится на этой строчке:

    std::string cmd; while (std::getline(std::cin, cmd))

  2. Как придумать архитектуру этой программы, чтобы она была легко расширяемой и легкой для понимания? Например, для того, чтобы можно было сделать эту программу сетевой.

  3. Как сделать управление этой программой при помощи внешнего кода?


Ответы (0 шт):