mutex работает крайне медленно

Всем здравствуйте! Пытаюсь создать параллельный алгоритм на с++. До этого пользовался многопоточностью WinAPI, я использовал класс Event и функции WaitForSingleObject, WaitForMultipleObjects. Сейчас стоит задача переписать тот код, избавив его от WinAPIшных функций. Логика взаимодействия потоков с основным потоком следующая: основной поток выполняет чтение данных, затем дочерние потоки выполняют параллельную обработку этих данных, после чего сохранение данных выполняет основной поток. Приведенный код работает, но проблема в том, что невероятно медленно. Если код с использованием WinAPI работает 3 секунды, то этот код работает аж 12 секунд!

P.S. избавляться от algorithm пробовал, это не дало эффекта абсолютно

#include <iostream>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
#include <algorithm>


int numOfThreads = 4;
std::vector<bool> vstart(numOfThreads, true);
std::mutex mx;
std::condition_variable cv;
int counter = 0;
int yMax = 228, yCurrent = 0;
bool stop = false;

void threadMethod(int it)
{
    std::unique_lock<std::mutex> lk(mx);
    while (true)
    {
        // поток ждет сигнала для начала работы
        vstart[it] = 0;
        cv.wait(lk, [it] {return vstart[it]; });

        if (stop)
            break;
        std::cout << counter << "\n";
    }
}

void main()
{
    // потоки создаются один раз перед основным циклом
    std::vector<std::thread> vt;
    for (int i = 0; i < numOfThreads; ++i)
    {
        vt.emplace_back(threadMethod, i);
    }

    // основной цикл
    for (; yCurrent < yMax; ++yCurrent)
    {
        while (std::any_of(vstart.begin(), vstart.end(), [](bool n) {return n == true; }))
            ;
        {
            // основной цикл выполняет потоконебезопасные действия, пока остальные потоки "спят"
            std::lock_guard<std::mutex> lg{ mx };
            ++counter;

            // основной цикл посылает сигналы в дочерние потоки для старта
            std::fill(vstart.begin(), vstart.end(), true);
        }
        cv.notify_all();
    }

    while (std::any_of(vstart.begin(), vstart.end(), [](int n) {return n == true; }))
        ;
    {
        std::lock_guard<std::mutex> lg{ mx };
        std::fill(vstart.begin(), vstart.end(), true);
        stop = true;
    }
    cv.notify_all();

    for (auto& t : vt)
    {
        t.join();
    }
}

Рабочий код на WinAPI:

#include <iostream>
#include <Windows.h>

int numOfThreads = 4;

HANDLE* hThreads;
HANDLE* hStartEvents;
HANDLE* hDoneEvents;
int counter = 0;
int yMax = 228, yCurrent = 0;

DWORD WINAPI threadMethod(LPVOID lParam)
{
    int threadNumber = (int)lParam;
    while (yCurrent < yMax)
    {
        // поток ждет сигнала для начала работы
        WaitForSingleObject(hStartEvents[threadNumber], INFINITE);

        // сбрасывает флаг начала работы
        ResetEvent(hStartEvents[threadNumber]);

        // выполняет действия
        std::cout << counter << "\n";

        // сообщает о завершении
        SetEvent(hDoneEvents[threadNumber]);
    }

    return 0;
}

void main()
{
    // -= СОЗДАНИЕ CОБЫТИЙ СТАРТА И ОСТАНОВА =-
    hStartEvents = new HANDLE[numOfThreads];
    hDoneEvents = new HANDLE[numOfThreads];
    for (int i = 0; i < numOfThreads; i++)
    {
        hStartEvents[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
        hDoneEvents[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    // -= СОЗДАНИЕ ПОТОКОВ =-
    hThreads = new HANDLE[numOfThreads];
    for (int i = 0; i < numOfThreads; i++)
    {
        hThreads[i] = CreateThread(NULL, 0, threadMethod, (LPVOID)i, 0, nullptr);
    }

    for (yCurrent = 0; yCurrent < yMax; yCurrent++)
    {
        // основной поток выполняет потоконебезопасные действия
        counter++;

        // основной поток высылает сигналы начала работы
        for (int i = 0; i < numOfThreads; i++) SetEvent(hStartEvents[i]);

        // оснвной поток "спит", пока выполняются дочерние потоки
        WaitForMultipleObjects(numOfThreads, hDoneEvents, TRUE, INFINITE);

        // сброс флагов событий завершения
        for (int i = 0; i < numOfThreads; i++) ResetEvent(hDoneEvents[i]);
    }
}

Ответы (1 шт):

Автор решения: Chorkov

Не нужно писать пул потоков самостоятельно. Например (решение через future):

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <future>
#include <algorithm>



int counter = 0;
std::mutex mx; // protect std::cout and ::counter.

int numOfThreads = 4;
int yMax = 228, yCurrent = 0;



void threadMethod(int it)
{
    {
        std::unique_lock<std::mutex> lk(mx);
        std::cout << "begin " << counter << " from " << it << "\n";
    }

    std::this_thread::sleep_for( std::chrono::milliseconds(1));

    {
        std::unique_lock<std::mutex> lk(mx);
        std::cout << "end " << counter << " from " << it << "\n";
    }
}

void main()
{
    // потоки создаются один раз перед основным циклом

    std::vector< std::future<void> > futures;
    futures.reserve( numOfThreads );


    // основной цикл
    for (; yCurrent < yMax; ++yCurrent)
    {
        {
            std::lock_guard<std::mutex> lg{ mx }; // Здесь лочить не обязательно, поскольку все потоки ждут std::async-f , но пусть будет для единообразия.
            ++counter;
        }

        for( int it=0; it<numOfThreads; ++it )
            futures.push_back(  std::async( threadMethod, it ) );

        for(auto& f : futures) // wait for all
            f.wait(); // get()  если нужен результат вычисления из threadMethod
        futures.clear();    
    }
}
→ Ссылка