Как правильно распараллелить поразрядную сортировку?
мне нужно организовать параллельную сортировку с применением технологи MPI. С последовательным алгоритмом я разобрался.
Помогите разобраться с тем, какой конкретно участок кода можно распараллелить в моём алгоритме. У меня появились трудности в понимании, поскольку каждый промежуточный результат сортировки нужен для корректной сортировки последующего разряда (т.е. отсортировали по единицам - затем по десяткам и т.д.). А если разделить массив на потоки (например, на 6 частей) - будет выполнена только локальная сортировка.
#include <iostream>
#include <mpi.h>
#include <fstream>
using namespace std;
void fillArray(int* arr, int size)
{
for (int i = 0; i < size; i++)
{
arr[i] = i;
}
}
void swap(int* first, int* second)
{
int temp = *first;
*first = *second;
*second = temp;
}
void shuffleArray(int* arr, int size)
{
for (int i = 0; i < size; i++)
{
swap(&arr[i], &arr[rand() % (size - 1)]);
}
}
void printArrayToFile(int* arr, int size, ofstream &file)
{
for (int i = 0; i < size; i++)
{
file << arr[i] << " ";
if (!((i + 1) % 15))
file << endl;
}
file << endl << endl;
}
bool isSeq(int* arr, int size)
{
for (int i = 1; i < size; i++)
{
if (arr[i] + 1 != arr[i + 1])
return false;
}
return true;
}
void copyArr(int* target, int* src, int size)
{
for (int i = 0; i < size; i++)
{
target[i] = src[i];
}
}
int* getRankGroup(int* arr, int size, int rank, int digit)
{
// 1234 rank - 3
int* group, groupSize = 0, targetDigit;
for (int i = 0; i < size; i++)
{
targetDigit = arr[i];
int j = rank;
while (j > 1)
{
targetDigit /= 10;
j--;
}
if ((targetDigit % 10) == digit)
groupSize++;
}
group = new int[groupSize + 1];
group[0] = groupSize;
for (int i = 0, k = 1; i < size; i++)
{
targetDigit = arr[i];
int j = rank;
while (j > 1)
{
targetDigit /= 10;
j--;
}
if ((targetDigit % 10) == digit)
{
group[k] = arr[i];
k++;
}
}
return group;
}
int getMaxArrRank(int* arr, int size)
{
int rank = 0, max = 0;
for (int i = 0; i < size; i++)
{
if (arr[i] > max)
max = arr[i];
}
while (max)
{
rank++;
max /= 10;
}
return rank;
}
int* radixSort(int* arr, int size)
{
int* result = new int[size];
int* temp = new int[size];
int* group, rank = getMaxArrRank(arr, size);
copyArr(temp, arr, size);
for (int i = 1; i <= rank; i++)
{
int resultI = 0;
for (int j = 9; j > -1; j--)
{
group = getRankGroup(temp, size, i, j);
for (int k = 1; k <= group[0]; k++, resultI++)
{
result[resultI] = group[k];
}
delete[] group;
}
copyArr(temp, result, size);
}
delete[] temp;
return result;
}
int main(int argc, char* argv[])
{
int size, threadId, numOfThreads;
int* arr, * resL, * resP;
ofstream shuffled, sortedL, sortedP;
if (argc > 1)
{
size = atoi(argv[1]);
}
else
{
cout << "Size isn't specified. Default size = 100" << endl;
size = 100;
}
arr = new int[size];
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numOfThreads);
MPI_Comm_rank(MPI_COMM_WORLD, &threadId);
if (threadId == 0)
{
shuffled.open("shuffled.txt"); sortedL.open("sortedL.txt"); sortedP.open("sortedP.txt");
fillArray(arr, size);
shuffleArray(arr, size);
printArrayToFile(arr, size, shuffled);
resL = radixSort(arr, size);
printArrayToFile(resL, size, sortedL);
}
MPI_Bcast(arr, size, MPI_INT, 0, MPI_COMM_WORLD);
///
MPI_Reduce(NULL, resP, size, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);
if (threadId == 0)
{
shuffled.close(); sortedL.close(); sortedP.close();
}
MPI_Finalize();
return 0;
}
Разобрался в технологии MPI и пришёл к выводу о том, что 1 поток должен собирать данные с других потоков, которые будут сортировать массивы по отдельным символам.
Однако, осталась ошибка в коде, которую я не смог найти. Долго пытался разобраться, почему моя программа сортирует массив только до 5 разряда (на 6-ом один поток почему то не возвращает массив из функции getRankGroup), но так и не нашёл причину. В чём может быть причина?
void radixSortMPI(int* arr, int size, int* resultArr, int threadId, int numOfThreads)
{
int* temp = new int[size];
int* group;
if (threadId == 0)
copyArr(temp, arr, size);
for (int rank = 1; rank <= getMaxArrRank(arr, size); rank++)
{
MPI_Bcast(temp, size, MPI_INT, 0, MPI_COMM_WORLD);
int resultI = 0;
// Get data
if (threadId == 0)
{
for (int j = 9; j >= 0; j--)
{
int* recvBuf = new int[size];
MPI_Recv(recvBuf, size, MPI_INT, MPI_ANY_SOURCE, j,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int k = 1; k <= recvBuf[0]; k++, resultI++)
{
resultArr[resultI] = recvBuf[k];
}
delete[] recvBuf;
}
copyArr(temp, resultArr, size);
}
// Evaluations
else
{
int step = ceil(10. / (numOfThreads - 1));
int digit = (threadId - 1) * step;
for (int j = digit; (j < (digit + step)) && (j < 10); j++)
{
group = getRankGroup(temp, size, rank, j);
MPI_Send(group, group[0] + 1, MPI_INT, 0, j, MPI_COMM_WORLD);
delete[] group;
}
}
}
delete[] temp;
}