mutex работает крайне медленно
Всем здравствуйте! Пытаюсь создать параллельный алгоритм на с++. До этого пользовался многопоточностью WinAPI, я использовал класс Event и функции WaitForSingleObject, WaitForMultipleObjects. Сейчас стоит задача переписать тот код, избавив его от WinAPIшных функций. Логика взаимодействия потоков с основным потоком следующая: основной поток выполняет чтение данных, затем дочерние потоки выполняют параллельную обработку этих данных, после чего сохранение данных выполняет основной поток. Приведенный код работает, но проблема в том, что невероятно медленно. Если код с использованием WinAPI работает 3 секунды, то этот код работает аж 12 секунд!
P.S. избавляться от algorithm пробовал, это не дало эффекта абсолютно
#include <iostream>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
#include <algorithm>
int numOfThreads = 4;
std::vector<bool> vstart(numOfThreads, true);
std::mutex mx;
std::condition_variable cv;
int counter = 0;
int yMax = 228, yCurrent = 0;
bool stop = false;
void threadMethod(int it)
{
std::unique_lock<std::mutex> lk(mx);
while (true)
{
// поток ждет сигнала для начала работы
vstart[it] = 0;
cv.wait(lk, [it] {return vstart[it]; });
if (stop)
break;
std::cout << counter << "\n";
}
}
void main()
{
// потоки создаются один раз перед основным циклом
std::vector<std::thread> vt;
for (int i = 0; i < numOfThreads; ++i)
{
vt.emplace_back(threadMethod, i);
}
// основной цикл
for (; yCurrent < yMax; ++yCurrent)
{
while (std::any_of(vstart.begin(), vstart.end(), [](bool n) {return n == true; }))
;
{
// основной цикл выполняет потоконебезопасные действия, пока остальные потоки "спят"
std::lock_guard<std::mutex> lg{ mx };
++counter;
// основной цикл посылает сигналы в дочерние потоки для старта
std::fill(vstart.begin(), vstart.end(), true);
}
cv.notify_all();
}
while (std::any_of(vstart.begin(), vstart.end(), [](int n) {return n == true; }))
;
{
std::lock_guard<std::mutex> lg{ mx };
std::fill(vstart.begin(), vstart.end(), true);
stop = true;
}
cv.notify_all();
for (auto& t : vt)
{
t.join();
}
}
Рабочий код на WinAPI:
#include <iostream>
#include <Windows.h>
int numOfThreads = 4;
HANDLE* hThreads;
HANDLE* hStartEvents;
HANDLE* hDoneEvents;
int counter = 0;
int yMax = 228, yCurrent = 0;
DWORD WINAPI threadMethod(LPVOID lParam)
{
int threadNumber = (int)lParam;
while (yCurrent < yMax)
{
// поток ждет сигнала для начала работы
WaitForSingleObject(hStartEvents[threadNumber], INFINITE);
// сбрасывает флаг начала работы
ResetEvent(hStartEvents[threadNumber]);
// выполняет действия
std::cout << counter << "\n";
// сообщает о завершении
SetEvent(hDoneEvents[threadNumber]);
}
return 0;
}
void main()
{
// -= СОЗДАНИЕ CОБЫТИЙ СТАРТА И ОСТАНОВА =-
hStartEvents = new HANDLE[numOfThreads];
hDoneEvents = new HANDLE[numOfThreads];
for (int i = 0; i < numOfThreads; i++)
{
hStartEvents[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
hDoneEvents[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
}
// -= СОЗДАНИЕ ПОТОКОВ =-
hThreads = new HANDLE[numOfThreads];
for (int i = 0; i < numOfThreads; i++)
{
hThreads[i] = CreateThread(NULL, 0, threadMethod, (LPVOID)i, 0, nullptr);
}
for (yCurrent = 0; yCurrent < yMax; yCurrent++)
{
// основной поток выполняет потоконебезопасные действия
counter++;
// основной поток высылает сигналы начала работы
for (int i = 0; i < numOfThreads; i++) SetEvent(hStartEvents[i]);
// оснвной поток "спит", пока выполняются дочерние потоки
WaitForMultipleObjects(numOfThreads, hDoneEvents, TRUE, INFINITE);
// сброс флагов событий завершения
for (int i = 0; i < numOfThreads; i++) ResetEvent(hDoneEvents[i]);
}
}
Ответы (1 шт):
Не нужно писать пул потоков самостоятельно. Например (решение через future):
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <future>
#include <algorithm>
int counter = 0;
std::mutex mx; // protect std::cout and ::counter.
int numOfThreads = 4;
int yMax = 228, yCurrent = 0;
void threadMethod(int it)
{
{
std::unique_lock<std::mutex> lk(mx);
std::cout << "begin " << counter << " from " << it << "\n";
}
std::this_thread::sleep_for( std::chrono::milliseconds(1));
{
std::unique_lock<std::mutex> lk(mx);
std::cout << "end " << counter << " from " << it << "\n";
}
}
void main()
{
// потоки создаются один раз перед основным циклом
std::vector< std::future<void> > futures;
futures.reserve( numOfThreads );
// основной цикл
for (; yCurrent < yMax; ++yCurrent)
{
{
std::lock_guard<std::mutex> lg{ mx }; // Здесь лочить не обязательно, поскольку все потоки ждут std::async-f , но пусть будет для единообразия.
++counter;
}
for( int it=0; it<numOfThreads; ++it )
futures.push_back( std::async( threadMethod, it ) );
for(auto& f : futures) // wait for all
f.wait(); // get() если нужен результат вычисления из threadMethod
futures.clear();
}
}