Поддержание выполнения определенного числа задач

У меня выполняется статистическое моделирование и для ускорения процесса на многоядерном процессоре хочется запускать его параллельными задачами.

У меня есть вот такой код:

    /// <summary>
    /// Моделирование по исходным данным на заданное количество реализаций
    /// </summary>
    /// <param name="Data">Исходные данные</param>
    /// <param name="NumberTasks">Число потоков</param>
    /// <param name="Cluster">Число реализаций на поток</param>
    /// <param name="NumberModeling">Необходимое число реализаций</param>
    /// <param name="token">Токен отмены моделирования</param>
    /// <param name="Progress">Интерфейс прогресса для отображения в интерфейсе</param>
    /// <returns></returns>
    public  static ModelingResult Modeling(StaticSystem Data, long Cluster, long NumberModeling, CancellationToken token, IProgress<double> Progress)
    {
        //Создаем новый таймер
        Stopwatch Timer = new Stopwatch();
        //запускаем таймер
        Timer.Start();
        //Создаем объект результат моделирования
        ModelingResult result = new ModelingResult();
        //сохраняем текущую дату
        DateTime Date = DateTime.Now;
        //Записываем дату в результат
        result.Date=Date;
        int NumberTasks = Data.Properties.NumberTasks;
        //создаем массив моделей системы
        DynamicSystem[] SystemModels = Factory.ProduceModels(Data, NumberTasks);
        //создаем список задач 
        List<Task<long>> ModelingTasks = new List<Task<long>>();
        OneTask[] TasksObjects = new OneTask[NumberTasks];
        //определяем необходимое число выполненных задач
        long TotalTasks = (long)Math.Floor((double)(NumberModeling / Cluster));
        //счетчик выполненных задач
        long NumberFinishedTasks = 0;
        //счетчик отказов
        long NumberFaults = 0;
        //номер текущей задачи
        int taskIndex;
        //устанавливаем номер текущей задачи
        int currentTaskIndex = NumberTasks;
        // заполняем массив задач и запускаем их
        long currentResult=0;
        //Временная переменная задачи
        Task<long> tempTask;
        for (taskIndex = 0; taskIndex < NumberTasks; taskIndex++)
        {
            //для каждого потока создаем отдельный объект
            TasksObjects[taskIndex] = new OneTask(SystemModels[taskIndex], Data.Properties, Cluster);
            //создаем новую задачу по объекту с нужным индексом 
            tempTask = new Task<long>(() => { return TasksObjects[taskIndex].Modeling(); });
            //добавляем новую задачу в список
            ModelingTasks.Add(tempTask);
            //Запускаем задачу
            tempTask.Start();
            if (ModelingTasks.IndexOf(tempTask) != taskIndex)
            {
                throw new Exception(Glossary.Messages["UnknownError"]);
            }
        }
        taskIndex = 0;

        //выполняем цикл, пока не достигнем нужного числа выполненных задач
        while (NumberFinishedTasks < TotalTasks) 
        {
            // Если поступил запрос на отмену моделирования
            if (token.IsCancellationRequested) 
            {
                //возвращаем пустой результат
                return null;                     
            }
            
            //если номер выполненной задачи входит в нужный интервал
            if (currentTaskIndex < NumberTasks)
            {

                // создаем новую задачу с теми же параметрами
                tempTask = new Task<long>(() => { return TasksObjects[currentTaskIndex].Modeling(); });
                //Вставляем новую задачу в список по нужному индексу
                ModelingTasks.Insert(currentTaskIndex,tempTask);
                //Запускаем новую задачу
                tempTask.Start();
                if (ModelingTasks.IndexOf(tempTask) != currentTaskIndex)
                {
                    throw new Exception(Glossary.Messages["UnknownError"]);
                }
            }

            //ждем, пока выполнится одна из задач
            Task<long> completedTask =  (Task.WhenAny(ModelingTasks)).Result;
            // и получаем ее индекс
            currentTaskIndex = ModelingTasks.IndexOf(completedTask);
            //Получаем результат выполненной задачи
            currentResult = completedTask.Result;

            if (!ModelingTasks.Contains(completedTask))
            {
                throw new Exception(Glossary.Messages["UnknownError"]);
            }

            //удаляем задачу из списка
            ModelingTasks.Remove(completedTask);
            //прибавляем число отказов в этой задаче к общему числу ошибок
            NumberFaults += currentResult;
            //увеличиваем счетчик выполненных задач
            NumberFinishedTasks++;
            double percent = (double)NumberFinishedTasks * (double)Cluster / (double)NumberModeling;
            Progress.Report(percent);
          completedTask.Dispose();              

        }

        long TotalModeling = TotalTasks * Cluster; //общее число реализаций моделирования
        double probability, lowProbability, highProbability;
        probability = 1 - ((double)NumberFaults / (double)TotalModeling); //вычисляем точечное значение показателя 
        result.Probability = probability; //записываем его в объект результата

        Basic.CalculateConfidenceBoards(probability, Data.Properties.ConfidenceProbability, TotalModeling, out lowProbability, out highProbability); //вычисляем доверительные границы показателя 
        result.LowBoard = lowProbability; //записываем нижнюю границу
        result.HighBoard = highProbability; //записываем верхнюю границу     
        Timer.Stop(); //останавливаем таймер
        result.TotalTimeOfModeling = (Timer.ElapsedMilliseconds)/1000.0; //записываем время выполнения в результаты
        return result; //возвращаем результат

Проблема в том, что пока я его запускаю с одним потоком (NumberTasks), всё прекрасно работает. Если же запускаю больше одного потока, то постоянно вываливаются разные ошибки, в основном связанные с тем, что не находит нужного индекса в списках уже в реализации модели. При этом при малом количестве реализаций (NumberModeling) бывает и проскакивает. С чем это может быть связано?


Ответы (1 шт):

Автор решения: InterceptorTSK

В вашем случае нужно весьма долго изучать что такое потоки, и как это работает. Причём, нужно основательно в этом разобраться, потому что всякое "производное" - следует строго из базовых вещей. Всё всегда построено на "базе", а производное - всегда суть обёртки, синтаксический сахар, и так далее. Краткая суть здесь всегда следующая.

  1. Существует метод, метод исполняется на стеке, т.е. стек - существует. Но это не точно, и это уже проблема.

  2. Существует куча, там экземпляры объектов лежат. Куча существует всегда.

  3. Стеков может быть несколько, это очень важное понимание. Именно под несколько потоков создаются "свои" стеки. Но тут и есть проблема. Никогда не известно - а существует ли тот или иной стек. Иначе говоря вы запускаете три потока, это три стека. Но когда они создаются и создаются ли - это не известно. И когда завершаются, и завершаются ли - это тоже не известно.

  4. Из пункта 3 следует весьма не банальность - никогда нельзя пересекать стеки, потому что сам смысл пересечения стеков - не определён, потому что не известно, а существует ли тот или иной стек?

  5. Как быть? Хранить результаты работы стеков в/на куче, ибо куча всегда существует.

Простейший код объясняющий работу многопотока. Вы пишете у вас есть объект, у объекта есть метод, но опять же вы пишете, что объекты могут быть для потоков свои, и это весьма правильно.

Создадим класс объекта, не экземпляр, но класс. Где простейшее поле, и метод увеличивающий это поле.

public class MyObject
{
    public long Field;
    
    public void DoMethod()
    {
        Field += 1;
    }
}

Далее как обычно пусть консоль, там Program и Main() как точка входа. Теперь нужны пусть 4 экземпляра класса MyObject, и 4 потока на каждый экземпляр. Естественно нужно замерить время работы. И 4 метода для каждого потока, поток - это метод, а метод - это стек для метода, и эти методы т.е. стеки - не пересекаются, т.е. методы в своих стеках, а результаты работы методов пишутся в кучу, в свои собственные поля Field каждого экземпляра MyObject.

Для работы System.Diagnostics.Stopwatch нужно подцепить сборку System

public static class Program
{
    public static MyObject obj0, obj1, obj2, obj3;

    public static void Main()
    {
        System.Console.WriteLine("Any to start");
        System.Console.ReadLine();

        // Это дополнительный "код", он "барабанит" ядро процессора
        // просто так, тем самым повышая частоту проца до максимальной
        // Такой код обычно называется "прогревашкой"
        long tmp = 0;
        var sw = new System.Diagnostics.Stopwatch();
        sw.Start();
        for (long i = 0; i < 4000000000; i++) tmp += 1;
        sw.Stop();
        System.Console.WriteLine("Progrevashka time {0}ms", sw.ElapsedMilliseconds);

        // Начало собственно кода здесь, создаём экземпляры в/на куче
        // для каждого потока свой Object
        obj0 = new MyObject();
        obj1 = new MyObject();
        obj2 = new MyObject();
        obj3 = new MyObject();

        // Создаём потоки, указываем методы
        // Методы для каждого потока свои, это четыре стека
        System.Threading.Thread
            t0 = new System.Threading.Thread(ThreadMethod0),
            t1 = new System.Threading.Thread(ThreadMethod1),
            t2 = new System.Threading.Thread(ThreadMethod2),
            t3 = new System.Threading.Thread(ThreadMethod3);

        // Запускаем потоки, создаются стеки, и на стеках работают методы
        t0.Start();
        t1.Start();
        t2.Start();
        t3.Start();

        // Вот здесь всё сложнее, каждый поток возвращает результат потока
        // обычно это ноль, если всё идёт "хорошо", но если что то идёт
        // не так, но это не ноль, и строго говоря нужно проверять
        // чем завершился поток
        t0.Join();
        t1.Join();
        t2.Join();
        t3.Join();

        // Все потоки завершили работу, суммируем поля объектов
        // которые на куче и всегда существуют
        System.Console.WriteLine(
            obj0.Field +
            obj1.Field +
            obj2.Field +
            obj3.Field);
        System.Console.ReadLine();
    }

    // Методы для потоков, где каждый метод работает со своим объектом
    // и метод никогда не лезет в другой метод другого потока, т.е. стека
    // ибо не известно, а существует ли другой стек
    public static void ThreadMethod0()
    {
        var sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        for (long i = 0; i < 1000000000; i++)
        {
            obj0.DoMethod();
        }

        sw.Stop();
        System.Console.WriteLine("Thread0 time {0}ms", sw.ElapsedMilliseconds);
    }
    // Ещё метод 1
    public static void ThreadMethod1()
    {
        var sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        for (long i = 0; i < 1000000000; i++)
        {
            obj1.DoMethod();
        }

        sw.Stop();
        System.Console.WriteLine("Thread1 time {0}ms", sw.ElapsedMilliseconds);
    }
    // Ещё метод 2
    public static void ThreadMethod2()
    {
        var sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        for (long i = 0; i < 1000000000; i++)
        {
            obj2.DoMethod();
        }

        sw.Stop();
        System.Console.WriteLine("Thread2 time {0}ms", sw.ElapsedMilliseconds);
    }
    // Ещё метод 3
    public static void ThreadMethod3()
    {
        var sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        for (long i = 0; i < 1000000000; i++)
        {
            obj3.DoMethod();
        }

        sw.Stop();
        System.Console.WriteLine("Thread3 time {0}ms", sw.ElapsedMilliseconds);
    }
}

Результаты на древней предельно стабильной тестовой станции

i5-3570k all 4 cores locked, Z77, no-HT, Framework 4.8.2 x64/release

Any to start
Progrevashka time 1487ms
Thread0 time 2269ms
Thread2 time 2281ms
Thread1 time 2300ms
Thread3 time 2337ms
4000000000

Если в 1 поток, результат в районе 6600ms

Суть тут следующая, многопоток - это одна из сложнейших тем, и разбираться нужно с самого начала. Потому что как было написано выше, без понимания базовых вещей лезть в ещё более сложные производные вещи от базовых - бессмысленно.

Базовое программирование многопотока - это свой стек/метод/объект на каждый поток. И синхронизация результатов строго в конце, это база многопотока. Пожалуйста, разберитесь в простейшем коде, и почитайте соответствующую литературу, и уже далее действительно можно [наверное] пробовать писать сложные вещи, а вещи здесь действительно - весьма сложные.

Синхронное/асинхронное программирование ещё сложнее устроено.

Пс: Не для автора вопроса. Постоянное пересоздание стека, т.е. не переиспользование стека - это вопрос философии. Потоки создаются что бы поработать и самозакрыться. А создать новый поток - не проблема. Для простейших проектов и простейшего распараллеливания - это очень простой и надёжный путь, создавать потоки под задачи, и не держать потоки на сложной синхронизации. Опять же по замерам миллион потоков [т.е. миллион стеков] с простейшей операцией на потоке/стеке создаются и прибиваются миллион штук в секунду, протестировано.

→ Ссылка