Почему при увеличении количества асинхронных операций растет время выполнения

Использование асинхронных чтения и записи дает невероятный выигрыш в скорости выполнения, однако не могу понять один момент. Если сделать количество асинхронных операций слишком большим, то скорость выполнения увеличивается (если конкретнее, то я про aio_read и aio_write из библиотеки aio). Я примерно понимаю что машина расходует дополнительные ресурсы, но хотелось бы лучше узнать о причинах такого поведения. Полагаю, что-то связано с оперативной памятью или пропускной способностью шины. Буду рад любой помощи.

Edited: Правильно люди подметили, что надо было код приложить. Если коротко, то у нас есть массив структур, размер которого задает пользователь:

struct aio_operation {
    struct aiocb* aio;
    char *buffer;
    int write_operation;
    int isWorking;
    int chunk;
    int readFileHandler;
    int writeFileHandler;
    int operationsCount;
    int fileSize;
};

Программа копирует файл (размером примерно в гиг), считывает из одного блоками маленького и размера и записывает в другой сразу.

В цикле все операции запускаются на чтение:

for(int i = 0; i < operationsCount; i++)
    {
        if (operations[i].aio->aio_offset >= fileSize)
        {
            operations[i].isWorking = 0;
        }
        else
        {
            operations[i].write_operation = 1;

            if (operations[i].aio->aio_offset +  operations[i].chunk > fileSize)
            {
                operations[i].aio->aio_nbytes = fileSize - operations[i].aio->aio_offset;
            }

            if (aio_read(operations[i].aio) == -1)
            {
                puts("Error: Can't async read file");
                return;
            }
        }
    }

А по окончании операции вызывается функция завершения, где эта же операция уже запускается на запись:

static void aio_completion_handler(sigval_t sigval) {
    struct aio_operation *aio_op = (struct aio_operation *)sigval.sival_ptr;

    if (aio_op->write_operation)
    {
        const struct aiocb *liste[] = {aio_op->aio};
        aio_op->write_operation = 0;
        aio_op->aio->aio_fildes = aio_op->writeFileHandler;

        if (aio_write(aio_op->aio) == -1)
        {
            puts("Error: Can't async write in file");
            return;
        }

    }
    else
    {
        const struct aiocb *liste[] = {aio_op->aio};
        aio_suspend(reinterpret_cast<const aiocb *const *>(&liste), 1, nullptr);

        if (aio_op->isWorking)
       {
            aio_op->aio->aio_offset += aio_op->chunk * aio_op->operationsCount;
            aio_op->aio->aio_fildes = aio_op->readFileHandler;

            if (aio_op->aio->aio_offset >= aio_op->fileSize)
            {
                aio_op->isWorking = 0;
            }
            else
            {
                aio_op->write_operation = 1;
                //printf("%d reading\n", aio_op->id);

                if (aio_op->aio->aio_offset + aio_op->chunk > aio_op->fileSize)
                {
                    aio_op->aio->aio_nbytes = aio_op->fileSize - aio_op->aio->aio_offset;
                }

                if (aio_read(aio_op->aio) == -1)
                {
                    puts("Error: Can't async read file");
                    return;
                }

            }
       }

    }
}

После записи операция запускается обратно на чтение. И так они перезапускаются до тех пор, пока файл не будет скопирован полностью. И если построить зависимость времени выполнения от количества этих самых операций, то мы получим вот такой график (где Y - затраченное время, Х - количество операций):

Код целиком:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <aio.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <chrono>
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>

#include <stdint.h>
#include <inttypes.h>
#define PATH_SIZE 200
#define CLUSTER_SIZE 512

int writeFileHandler;
int readFileHandler;
int operationsCount;
int fileSize;

struct aio_operation {
    struct aiocb* aio;
    char *buffer;
    int write_operation;
    int isWorking;
    int chunk;
    int readFileHandler;
    int writeFileHandler;
    int operationsCount;
    int fileSize;
};

void DeleteNSymbol(char* label)
{
    for (int i = 0; i < 200; i++)
    {
        if (label[i] == '\n')
        {
            label[i] = '\0';
        }

        if (label[i] == '\0')
        {
            return;
        }
    }
}

void FreeMemory(struct aio_operation* operations, int n)
{
    for (int i = 0; i < n; i++)
        {
            free(operations[i].buffer);
            free(operations[i].aio);
        }

    free(operations);
}

static void aio_completion_handler(sigval_t sigval) {
    struct aio_operation *aio_op = (struct aio_operation *)sigval.sival_ptr;

    if (aio_op->write_operation)
    {
        const struct aiocb *liste[] = {aio_op->aio};
        aio_suspend(reinterpret_cast<const aiocb *const *>(&liste), 1, nullptr);
        aio_op->write_operation = 0;
        aio_op->aio->aio_fildes = aio_op->writeFileHandler;

        if (aio_write(aio_op->aio) == -1)
        {
            puts("Error: Can't async write in file");
            return;
        }

    }
    else
    {
        const struct aiocb *liste[] = {aio_op->aio};
        aio_suspend(reinterpret_cast<const aiocb *const *>(&liste), 1, nullptr);

        if (aio_op->isWorking)
       {
            aio_op->aio->aio_offset += aio_op->chunk * aio_op->operationsCount;
            aio_op->aio->aio_fildes = aio_op->readFileHandler;

            if (aio_op->aio->aio_offset >= aio_op->fileSize)
            {
                aio_op->isWorking = 0;
            }
            else
            {
                aio_op->write_operation = 1;
                //printf("%d reading\n", aio_op->id);

                if (aio_op->aio->aio_offset + aio_op->chunk > aio_op->fileSize)
                {
                    aio_op->aio->aio_nbytes = aio_op->fileSize - aio_op->aio->aio_offset;
                }

                if (aio_read(aio_op->aio) == -1)
                {
                    puts("Error: Can't async read file");
                    return;
                }

            }
       }

    }
}

void InitOperations(struct aio_operation* operations, int cluster)
{
    for (int i = 0; i < operationsCount; i++)
    {
        operations[i].aio = (struct aiocb*)malloc(sizeof(struct aiocb));
        memset(operations[i].aio, 0, sizeof(*(operations[i].aio)));
        operations[i].write_operation = 0;
        operations[i].aio->aio_sigevent.sigev_notify = SIGEV_THREAD;
        operations[i].aio->aio_sigevent.sigev_notify_function = aio_completion_handler;
        operations[i].aio->aio_sigevent.sigev_value.sival_ptr = &operations[i];
        operations[i].chunk = cluster;
        operations[i].buffer = (char*)calloc(cluster, sizeof(char));
        operations[i].readFileHandler = readFileHandler;
        operations[i].writeFileHandler = writeFileHandler;
        operations[i].operationsCount = operationsCount;
        operations[i].fileSize = fileSize;

        if (cluster > fileSize)
        {
            operations[i].aio->aio_nbytes = fileSize;
        }
        else
        {
            operations[i].aio->aio_nbytes = cluster;
        }

        operations[i].aio->aio_fildes = readFileHandler;
        operations[i].aio->aio_offset = cluster * i;
        operations[i].aio->aio_buf = operations[i].buffer;
        operations[i].isWorking = 1;
    }
}



void StartOperations(struct aio_operation* operations)
{
    int counter = 0;

    for(int i = 0; i < operationsCount; i++)
    {
        if (operations[i].aio->aio_offset >= fileSize)
        {
            operations[i].isWorking = 0;
        }
        else
        {
            operations[i].write_operation = 1;

            if (operations[i].aio->aio_offset +  operations[i].chunk > fileSize)
            {
                operations[i].aio->aio_nbytes = fileSize - operations[i].aio->aio_offset;
            }

            if (aio_read(operations[i].aio) == -1)
            {
                puts("Error: Can't async read file");
                return;
            }
        }
    }

    while (counter < operationsCount)
    {
        counter = 0;

        for(int i = 0; i < operationsCount; i++)
        {
            if (!operations[i].isWorking)
                counter++;
        }

        usleep(1);
    }

}

void ReadPath(char* filePath)
{
    fgets(filePath, PATH_SIZE, stdin);
    DeleteNSymbol(filePath);
}

int main(void)
{
    int cluster;
    //clock_t startProg, endProg;
    struct stat fileStat;
    struct aio_operation* operations;
    char newFilePath[PATH_SIZE];
    char filePath[PATH_SIZE];

    puts("Print the path of file, that you want to copy:");
    ReadPath(filePath);
    puts("Print the path of new file, where you want to copy first file:");
    ReadPath(newFilePath);

    puts("Enter processes count: ");
    scanf("%d", &operationsCount);
    operations = (struct aio_operation*)calloc(operationsCount, sizeof(struct aio_operation));
    puts("Enter block of data size: ");
    scanf("%d", &cluster);
    cluster *= CLUSTER_SIZE;

    auto startProg = std::chrono::high_resolution_clock::now();

    readFileHandler = open(filePath, O_RDONLY | O_NONBLOCK, 0666);
    writeFileHandler = open(newFilePath, O_CREAT | O_WRONLY | O_TRUNC | O_NONBLOCK, 0666);

    if (readFileHandler == -1 || writeFileHandler == -1)
    {
        puts("Error: Can't open file");
        free(operations);
        return -1;
    }

    if (fstat(readFileHandler, &fileStat) == 1)
    {
        puts("Error: Can't get opened file info");
        close(readFileHandler);
        close(writeFileHandler);
        free(operations);
        return -1;
    }

    fileSize = fileStat.st_size;
    printf("File size - %d\n", fileSize);

    InitOperations(operations, cluster);
    StartOperations(operations);

    auto endProg = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endProg - startProg).count();
    close(readFileHandler);
    close(writeFileHandler);
    puts("Copy completed");
    printf("Time spent: %ld ms\n", duration);
    FreeMemory(operations, operationsCount);
    return 0;
}

Ответы (0 шт):