C++ синхронизация потоков

Читаю строки из файла, затем делаю очередь тасок, в таске несколько строк, затем запускаю несколько потоков для сортировки строк в этих тасках, далее в mergeSortedChunks() надо отсортировать уже отсортированные строки, которые находятся в этих тасках (там просто сортировка, но потом будет переделана под сортировку слиянием, проблема не в этом), далее это все запишется в файл. При выполнении программы возникла проблема, как мне кажется, это все из-за неправильной синхронизации, жалуется на ошибку чтения при выполнении processTasks(). Помогите пожалуйста, на С++ писал прям очень мало, так тут еще и потоки.

#include <iostream>
#include <fstream>
#include <vector>
#include <queue>
#include <windows.h>
#include <algorithm>
#include <string>

using namespace std;

const char* filename = "input.txt";
const int numThreads = 4;

struct SortTask {
    vector<string> lines;
};

queue<SortTask> tasks;
vector<SortTask> completedTasks;
CRITICAL_SECTION cs1, cs2;
CONDITION_VARIABLE cv1, cv2;

void pushTask(const SortTask& task) {
    EnterCriticalSection(&cs1);
    tasks.push(task);
    LeaveCriticalSection(&cs1);
    WakeConditionVariable(&cv1);
}

bool popTask(SortTask* task) {
    EnterCriticalSection(&cs2);
    if(tasks.empty())
    {
        SleepConditionVariableCS(&cv2, &cs2, INFINITE);
        return false;
    }
    SortTask taskfromtasks = tasks.front();
    tasks.pop();
    *task = taskfromtasks;
    LeaveCriticalSection(&cs2);
    return true;
}

DWORD WINAPI processTasks(LPVOID lpParam) 
{
    SortTask task;
    while (popTask(&task))
    {
        sort(task.lines.begin(), task.lines.end());
        completedTasks.push_back(task);
    }
    return 0;
}

void readAndSplitFile(char* filename) 
{
    ifstream file(filename);
    vector<string> lines;
    string line;
    while (getline(file, line)) 
    {
        lines.push_back(line);
        if (lines.size() == numThreads) 
        {
            pushTask({ lines });
            lines.clear();
        }
    }

    if (!lines.empty()) {
        pushTask({ lines });
    }

    file.close();
}

void mergeSortedChunks() 
{
    vector<string> sortedLines;
    for (const auto& chunk : completedTasks) 
    {
        sortedLines.insert(sortedLines.end(), chunk.lines.begin(), chunk.lines.end());
    }
    sort(sortedLines.begin(), sortedLines.end());

    ofstream outputFile("sorted_output.txt");
    for (const auto& line : sortedLines) 
    {
        outputFile << line << endl;
    }
    outputFile.close();
}

int main() {

    InitializeCriticalSection(&cs1);
    InitializeConditionVariable(&cv1);
    InitializeCriticalSection(&cs2);
    InitializeConditionVariable(&cv2);

    HANDLE inputThread = CreateThread(NULL, 0, LPTHREAD_START_ROUTINE(readAndSplitFile), (LPVOID)filename, 0, NULL);
    WaitForSingleObject(inputThread, INFINITE);
    HANDLE threadHandles[numThreads];
    for (int i = 0; i < numThreads; ++i) 
    {
        threadHandles[i] = CreateThread(NULL, 0, processTasks, NULL, 0, NULL);
    }
    WaitForMultipleObjects(numThreads, threadHandles, true, INFINITE);
    HANDLE outputThread = CreateThread(NULL, 0, LPTHREAD_START_ROUTINE(mergeSortedChunks), 0, 0, NULL);

    WaitForSingleObject(outputThread, INFINITE);

    DeleteCriticalSection(&cs1);
    DeleteCriticalSection(&cs2);
    CloseHandle(inputThread);
    CloseHandle(outputThread);
    for (int i = 0; i < numThreads; ++i) 
    {
        CloseHandle(threadHandles[i]);
    }

    return 0;
}


Ответы (0 шт):