Почему при увеличении количества асинхронных операций растет время выполнения
Использование асинхронных чтения и записи дает невероятный выигрыш в скорости выполнения, однако не могу понять один момент. Если сделать количество асинхронных операций слишком большим, то скорость выполнения увеличивается (если конкретнее, то я про aio_read и aio_write из библиотеки aio). Я примерно понимаю что машина расходует дополнительные ресурсы, но хотелось бы лучше узнать о причинах такого поведения. Полагаю, что-то связано с оперативной памятью или пропускной способностью шины. Буду рад любой помощи.
Edited: Правильно люди подметили, что надо было код приложить. Если коротко, то у нас есть массив структур, размер которого задает пользователь:
struct aio_operation {
struct aiocb* aio;
char *buffer;
int write_operation;
int isWorking;
int chunk;
int readFileHandler;
int writeFileHandler;
int operationsCount;
int fileSize;
};
Программа копирует файл (размером примерно в гиг), считывает из одного блоками маленького и размера и записывает в другой сразу.
В цикле все операции запускаются на чтение:
for(int i = 0; i < operationsCount; i++)
{
if (operations[i].aio->aio_offset >= fileSize)
{
operations[i].isWorking = 0;
}
else
{
operations[i].write_operation = 1;
if (operations[i].aio->aio_offset + operations[i].chunk > fileSize)
{
operations[i].aio->aio_nbytes = fileSize - operations[i].aio->aio_offset;
}
if (aio_read(operations[i].aio) == -1)
{
puts("Error: Can't async read file");
return;
}
}
}
А по окончании операции вызывается функция завершения, где эта же операция уже запускается на запись:
static void aio_completion_handler(sigval_t sigval) {
struct aio_operation *aio_op = (struct aio_operation *)sigval.sival_ptr;
if (aio_op->write_operation)
{
const struct aiocb *liste[] = {aio_op->aio};
aio_op->write_operation = 0;
aio_op->aio->aio_fildes = aio_op->writeFileHandler;
if (aio_write(aio_op->aio) == -1)
{
puts("Error: Can't async write in file");
return;
}
}
else
{
const struct aiocb *liste[] = {aio_op->aio};
aio_suspend(reinterpret_cast<const aiocb *const *>(&liste), 1, nullptr);
if (aio_op->isWorking)
{
aio_op->aio->aio_offset += aio_op->chunk * aio_op->operationsCount;
aio_op->aio->aio_fildes = aio_op->readFileHandler;
if (aio_op->aio->aio_offset >= aio_op->fileSize)
{
aio_op->isWorking = 0;
}
else
{
aio_op->write_operation = 1;
//printf("%d reading\n", aio_op->id);
if (aio_op->aio->aio_offset + aio_op->chunk > aio_op->fileSize)
{
aio_op->aio->aio_nbytes = aio_op->fileSize - aio_op->aio->aio_offset;
}
if (aio_read(aio_op->aio) == -1)
{
puts("Error: Can't async read file");
return;
}
}
}
}
}
После записи операция запускается обратно на чтение. И так они перезапускаются до тех пор, пока файл не будет скопирован полностью. И если построить зависимость времени выполнения от количества этих самых операций, то мы получим вот такой график (где Y - затраченное время, Х - количество операций):
Код целиком:
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <aio.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <chrono>
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>
#include <stdint.h>
#include <inttypes.h>
#define PATH_SIZE 200
#define CLUSTER_SIZE 512
int writeFileHandler;
int readFileHandler;
int operationsCount;
int fileSize;
struct aio_operation {
struct aiocb* aio;
char *buffer;
int write_operation;
int isWorking;
int chunk;
int readFileHandler;
int writeFileHandler;
int operationsCount;
int fileSize;
};
void DeleteNSymbol(char* label)
{
for (int i = 0; i < 200; i++)
{
if (label[i] == '\n')
{
label[i] = '\0';
}
if (label[i] == '\0')
{
return;
}
}
}
void FreeMemory(struct aio_operation* operations, int n)
{
for (int i = 0; i < n; i++)
{
free(operations[i].buffer);
free(operations[i].aio);
}
free(operations);
}
static void aio_completion_handler(sigval_t sigval) {
struct aio_operation *aio_op = (struct aio_operation *)sigval.sival_ptr;
if (aio_op->write_operation)
{
const struct aiocb *liste[] = {aio_op->aio};
aio_suspend(reinterpret_cast<const aiocb *const *>(&liste), 1, nullptr);
aio_op->write_operation = 0;
aio_op->aio->aio_fildes = aio_op->writeFileHandler;
if (aio_write(aio_op->aio) == -1)
{
puts("Error: Can't async write in file");
return;
}
}
else
{
const struct aiocb *liste[] = {aio_op->aio};
aio_suspend(reinterpret_cast<const aiocb *const *>(&liste), 1, nullptr);
if (aio_op->isWorking)
{
aio_op->aio->aio_offset += aio_op->chunk * aio_op->operationsCount;
aio_op->aio->aio_fildes = aio_op->readFileHandler;
if (aio_op->aio->aio_offset >= aio_op->fileSize)
{
aio_op->isWorking = 0;
}
else
{
aio_op->write_operation = 1;
//printf("%d reading\n", aio_op->id);
if (aio_op->aio->aio_offset + aio_op->chunk > aio_op->fileSize)
{
aio_op->aio->aio_nbytes = aio_op->fileSize - aio_op->aio->aio_offset;
}
if (aio_read(aio_op->aio) == -1)
{
puts("Error: Can't async read file");
return;
}
}
}
}
}
void InitOperations(struct aio_operation* operations, int cluster)
{
for (int i = 0; i < operationsCount; i++)
{
operations[i].aio = (struct aiocb*)malloc(sizeof(struct aiocb));
memset(operations[i].aio, 0, sizeof(*(operations[i].aio)));
operations[i].write_operation = 0;
operations[i].aio->aio_sigevent.sigev_notify = SIGEV_THREAD;
operations[i].aio->aio_sigevent.sigev_notify_function = aio_completion_handler;
operations[i].aio->aio_sigevent.sigev_value.sival_ptr = &operations[i];
operations[i].chunk = cluster;
operations[i].buffer = (char*)calloc(cluster, sizeof(char));
operations[i].readFileHandler = readFileHandler;
operations[i].writeFileHandler = writeFileHandler;
operations[i].operationsCount = operationsCount;
operations[i].fileSize = fileSize;
if (cluster > fileSize)
{
operations[i].aio->aio_nbytes = fileSize;
}
else
{
operations[i].aio->aio_nbytes = cluster;
}
operations[i].aio->aio_fildes = readFileHandler;
operations[i].aio->aio_offset = cluster * i;
operations[i].aio->aio_buf = operations[i].buffer;
operations[i].isWorking = 1;
}
}
void StartOperations(struct aio_operation* operations)
{
int counter = 0;
for(int i = 0; i < operationsCount; i++)
{
if (operations[i].aio->aio_offset >= fileSize)
{
operations[i].isWorking = 0;
}
else
{
operations[i].write_operation = 1;
if (operations[i].aio->aio_offset + operations[i].chunk > fileSize)
{
operations[i].aio->aio_nbytes = fileSize - operations[i].aio->aio_offset;
}
if (aio_read(operations[i].aio) == -1)
{
puts("Error: Can't async read file");
return;
}
}
}
while (counter < operationsCount)
{
counter = 0;
for(int i = 0; i < operationsCount; i++)
{
if (!operations[i].isWorking)
counter++;
}
usleep(1);
}
}
void ReadPath(char* filePath)
{
fgets(filePath, PATH_SIZE, stdin);
DeleteNSymbol(filePath);
}
int main(void)
{
int cluster;
//clock_t startProg, endProg;
struct stat fileStat;
struct aio_operation* operations;
char newFilePath[PATH_SIZE];
char filePath[PATH_SIZE];
puts("Print the path of file, that you want to copy:");
ReadPath(filePath);
puts("Print the path of new file, where you want to copy first file:");
ReadPath(newFilePath);
puts("Enter processes count: ");
scanf("%d", &operationsCount);
operations = (struct aio_operation*)calloc(operationsCount, sizeof(struct aio_operation));
puts("Enter block of data size: ");
scanf("%d", &cluster);
cluster *= CLUSTER_SIZE;
auto startProg = std::chrono::high_resolution_clock::now();
readFileHandler = open(filePath, O_RDONLY | O_NONBLOCK, 0666);
writeFileHandler = open(newFilePath, O_CREAT | O_WRONLY | O_TRUNC | O_NONBLOCK, 0666);
if (readFileHandler == -1 || writeFileHandler == -1)
{
puts("Error: Can't open file");
free(operations);
return -1;
}
if (fstat(readFileHandler, &fileStat) == 1)
{
puts("Error: Can't get opened file info");
close(readFileHandler);
close(writeFileHandler);
free(operations);
return -1;
}
fileSize = fileStat.st_size;
printf("File size - %d\n", fileSize);
InitOperations(operations, cluster);
StartOperations(operations);
auto endProg = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endProg - startProg).count();
close(readFileHandler);
close(writeFileHandler);
puts("Copy completed");
printf("Time spent: %ld ms\n", duration);
FreeMemory(operations, operationsCount);
return 0;
}
