C++ синхронизация потоков
Читаю строки из файла, затем делаю очередь тасок, в таске несколько строк, затем запускаю несколько потоков для сортировки строк в этих тасках, далее в mergeSortedChunks() надо отсортировать уже отсортированные строки, которые находятся в этих тасках (там просто сортировка, но потом будет переделана под сортировку слиянием, проблема не в этом), далее это все запишется в файл. При выполнении программы возникла проблема, как мне кажется, это все из-за неправильной синхронизации, жалуется на ошибку чтения при выполнении processTasks(). Помогите пожалуйста, на С++ писал прям очень мало, так тут еще и потоки.
#include <iostream>
#include <fstream>
#include <vector>
#include <queue>
#include <windows.h>
#include <algorithm>
#include <string>
using namespace std;
const char* filename = "input.txt";
const int numThreads = 4;
struct SortTask {
vector<string> lines;
};
queue<SortTask> tasks;
vector<SortTask> completedTasks;
CRITICAL_SECTION cs1, cs2;
CONDITION_VARIABLE cv1, cv2;
void pushTask(const SortTask& task) {
EnterCriticalSection(&cs1);
tasks.push(task);
LeaveCriticalSection(&cs1);
WakeConditionVariable(&cv1);
}
bool popTask(SortTask* task) {
EnterCriticalSection(&cs2);
if(tasks.empty())
{
SleepConditionVariableCS(&cv2, &cs2, INFINITE);
return false;
}
SortTask taskfromtasks = tasks.front();
tasks.pop();
*task = taskfromtasks;
LeaveCriticalSection(&cs2);
return true;
}
DWORD WINAPI processTasks(LPVOID lpParam)
{
SortTask task;
while (popTask(&task))
{
sort(task.lines.begin(), task.lines.end());
completedTasks.push_back(task);
}
return 0;
}
void readAndSplitFile(char* filename)
{
ifstream file(filename);
vector<string> lines;
string line;
while (getline(file, line))
{
lines.push_back(line);
if (lines.size() == numThreads)
{
pushTask({ lines });
lines.clear();
}
}
if (!lines.empty()) {
pushTask({ lines });
}
file.close();
}
void mergeSortedChunks()
{
vector<string> sortedLines;
for (const auto& chunk : completedTasks)
{
sortedLines.insert(sortedLines.end(), chunk.lines.begin(), chunk.lines.end());
}
sort(sortedLines.begin(), sortedLines.end());
ofstream outputFile("sorted_output.txt");
for (const auto& line : sortedLines)
{
outputFile << line << endl;
}
outputFile.close();
}
int main() {
InitializeCriticalSection(&cs1);
InitializeConditionVariable(&cv1);
InitializeCriticalSection(&cs2);
InitializeConditionVariable(&cv2);
HANDLE inputThread = CreateThread(NULL, 0, LPTHREAD_START_ROUTINE(readAndSplitFile), (LPVOID)filename, 0, NULL);
WaitForSingleObject(inputThread, INFINITE);
HANDLE threadHandles[numThreads];
for (int i = 0; i < numThreads; ++i)
{
threadHandles[i] = CreateThread(NULL, 0, processTasks, NULL, 0, NULL);
}
WaitForMultipleObjects(numThreads, threadHandles, true, INFINITE);
HANDLE outputThread = CreateThread(NULL, 0, LPTHREAD_START_ROUTINE(mergeSortedChunks), 0, 0, NULL);
WaitForSingleObject(outputThread, INFINITE);
DeleteCriticalSection(&cs1);
DeleteCriticalSection(&cs2);
CloseHandle(inputThread);
CloseHandle(outputThread);
for (int i = 0; i < numThreads; ++i)
{
CloseHandle(threadHandles[i]);
}
return 0;
}