Поддержание количества выполняемых параллельно задач

В продолжение вопроса Поддержание выполнения определенного числа задач Грубо говоря мне надо выполнить 10^9 раз метод на объекте. Для экономии времени на многоядерном процессоре, я хотел запускать метод на на нескольких копиях объекта, причем относительно небольшими порциями, чтобы количество выполняемых методов оставалось всегда одинаковым. Я набросал простой пример, логика выполнения и взаимодействия в ообщем-то соблюдена.

Класс данных - в нем только три поля и конструктор

    /// <summary>
    /// Класс данных
    /// </summary>
    public class SystemData
    {
        /// <summary>
        /// Число элементов
        /// </summary>
        public int Number;

        /// <summary>
        /// Число запасных
        /// </summary>
        public int SpareNumber;

        /// <summary>
        /// Вероятность 
        /// </summary>
        public double probability;

        /// <summary>
        /// Конструктор по умолчанию
        /// </summary>
        public SystemData ()
        {
            Number=1;
            SpareNumber=0;
            probability=1.0;
        }

        /// <summary>
        /// Конструктор с всеми параметрами
        /// </summary>
        /// <param name="Number">Число элементов</param>
        /// <param name="SpareNumber">Число запасных</param>
        /// <param name="probability">Вероятность</param>
        public SystemData(int Number, int SpareNumber, double probability)
        {
            this.Number = Number;
            this.SpareNumber = SpareNumber;
            this.probability = probability;
        } 

Класс модели - в нем те же три поля, необходимый для моделирования список, генератор случайных чисел, и единственный метод, выдающий bool.

   /// <summary>
    /// Класс модели
    /// </summary>
    public class SystemModel
    {

        /// <summary>
        /// Число элементов
        /// </summary>
       public int Number;

       /// <summary>
       /// Число запасных
       /// </summary>
        public int SpareNumber;

        /// <summary>
        /// Вероятность 
        /// </summary>
        public double probability;

        /// <summary>
        /// Список инцидентов
        /// </summary>
        private List<double> AccidentsList;

        /// <summary>
        /// Генератор случайных чисел
        /// </summary>
        private Random RND;

        /// <summary>
        /// Конструктор по объекту данных
        /// </summary>
        /// <param name="Data"></param>
        public SystemModel(SystemData Data)
        {
            Number = Data.Number;
            SpareNumber = Data.SpareNumber;
            probability = Data.probability;
            AccidentsList = new List<double>();
            RND = new Random();
            for (int index=0; index<SpareNumber;index++)
            {
                AccidentsList.Add(RND.NextDouble());
            }                
        }

        /// <summary>
        /// Метод однократного моделирования
        /// </summary>
        /// <returns>ложь, если запасных не хватило</returns>
        public bool Modeling()
        {
            for (int index = 0; index < Number; index++)
            {
                if (AccidentsList.Count() == 0)
                {
                    return false;
                }
                if (RND.NextDouble() > probability)
                {
                    AccidentsList.RemoveAt(0);
                }
            }
            return true;
        }
    }

Класс для моделирования заданного числа реализаций - два поля и единственный метод, выдающий число полученных false.

        /// <summary>
        /// Модель на которой выполняется моделирование
        /// </summary>
        private SystemModel Model;

        /// <summary>
        /// Количество выполняемых реализаций
        /// </summary>
        private long Cluster;

        /// <summary>
        /// конструктор со всеми параметрами
        /// </summary>
        public OneTask(SystemModel Model, long Cluster)
        {
            this.Model = Model;
            this.Cluster = Cluster;

        }

        /// <summary>
        /// Метод моделирует заданное число реализаций
        /// </summary>
        /// <returns>количество отказов</returns>
        internal long FaultsModeling()
        {
            long NumberFaults = 0;
            for (long IndexModeling = 0; IndexModeling < Cluster; IndexModeling++)
            {
                if (!Model.Modeling())
                {
                    NumberFaults++;
                }
            }
            return NumberFaults;
        }

Класс моделирования в нескольких "потоках"

    /// <summary>
    /// Класс для моделирования в заданном числе потоков
    /// </summary>
    public class MultiTask
    {
        /// <summary>
        /// объект данных
        /// </summary>
        private SystemData Data;

        /// <summary>
        /// Число выполняемых реализаций
        /// </summary>
        private long NumberModeling;

        /// <summary>
        /// Число реализаций выполняемых в одном потоке
        /// </summary>
        private long Cluster;

        /// <summary>
        /// Число параллельных потоков
        /// </summary>
        private int NumberTasks;

        /// <summary>
        /// Конструктор со всеми параметрами
        /// </summary>
        /// <param name="Data"></param>
        /// <param name="NumberModeling"></param>
        /// <param name="Cluster"></param>
        /// <param name="NumberTasks"></param>
        public MultiTask(SystemData Data, long NumberModeling, long Cluster, int NumberTasks)
        {
            this.Data = Data;
            this.NumberModeling = NumberModeling;
            this.Cluster = Cluster;
            this.NumberTasks = NumberTasks;
        }

        public double GetResult()
        {
            //Общее число отказов
            long TotalFaults = 0;
            //создаем список задач 
            List<Task<long>> ModelingTasks = new List<Task<long>>();
            //определяем необходимое число выполненных задач
            long TotalTasks = (long)Math.Floor((double)(NumberModeling / Cluster));
            //счетчик выполненных задач
            long NumberFinishedTasks = 0;
            //текущий результат
            long currentResult;
            //временная модель системы
            SystemModel tempModel;
            //Временная переменная задачи
            Task<long> tempTask;

            //Заполняем список задач
            for (int taskIndex = 0; taskIndex < NumberTasks; taskIndex++)
            {
                //Создаем новый объект модели системы
                tempModel = new SystemModel(Data);
                //создаем новую задачу  
                tempTask = new Task<long>(() => { return new OneTask(tempModel, Cluster).FaultsModeling(); });
                //добавляем новую задачу в список
                ModelingTasks.Add(tempTask);
                //Запускаем задачу
                tempTask.Start();
            }

            //выполняем цикл, пока не достигнем нужного числа выполненных задач
            while (NumberFinishedTasks < TotalTasks)
            {
                //ждем, пока выполнится одна из задач
                Task<long> completedTask = (Task.WhenAny(ModelingTasks)).Result;
                //Получаем результат выполненной задачи
                currentResult = completedTask.Result;
                //удаляем задачу из списка
                ModelingTasks.Remove(completedTask);
                completedTask.Dispose();
                //прибавляем число отказов в этой задаче к общему числу ошибок
                TotalFaults += currentResult;
                //увеличиваем счетчик выполненных задач
                NumberFinishedTasks++;
                //Создаем новый объект модели системы
                tempModel = new SystemModel(Data);
                // создаем новую задачу 
                tempTask = new Task<long>(() => { return new OneTask(tempModel, Cluster).FaultsModeling(); });
                //Вставляем новую задачу в список 
                ModelingTasks.Add(tempTask);
                //Запускаем новую задачу
                tempTask.Start();
            }

            //общее число реализаций моделирования
            long TotalModeling = TotalTasks * Cluster;
            //Вычисляем итоговую вероятность
            double result =1- (double)TotalFaults / (double)TotalModeling;
            //возвращаем результат
            return result; 
        }

    }

Всё это просто проверил в консоли

{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Введите число элементов");
            int Number = int.Parse(Console.ReadLine());
            Console.WriteLine("Введите число запасных");
            int SpareNumber = int.Parse(Console.ReadLine());
            Console.WriteLine("Введите вероятность работы");
            double probality = double.Parse(Console.ReadLine());

            SystemData Data = new SystemData(Number, SpareNumber, probality);

            Console.WriteLine("Введите необходимое число моделирований");
            long NumberModeling = long.Parse(Console.ReadLine());
            Console.WriteLine("Введите число моделирований в одном кластере");
            long Cluster = long.Parse(Console.ReadLine());
            Console.WriteLine("Введите число параллельных потоков");
            int NumberTasks = int.Parse(Console.ReadLine());

            MultiTask MultiModel = new MultiTask(Data, NumberModeling, Cluster, NumberTasks);

            double result = MultiModel.GetResult();

            Console.WriteLine("Получен результат: {0}", result);
        }
    }

Тестовый пример 100 - 12 -0,9. 10^8 реализаций на одном потоке выполняются без проблем. Но если сделать 10 потоков и 10^6 реализаций, то появляется ошибка, связанная со списком AccidentsList из класса SystemModel (Индекс за пределами диапазона. Индекс должен быть положительным числом, а его размер не должен превышать размер коллекции.). Хотя ровно перед удалением элемента из списка происходит проверка, что список не пустой. И список создается заново в конструкторе, у каждого объекта он свой. Откуда может быть такая особенность? Что я делаю не так? Или это Task.WhenAny так странно работает?

Еще раз спасибо @EvgeniyZ за критику и предложения по моему вопросу, постарался учесть.


Ответы (1 шт):

Автор решения: CrazyElf
//временная модель системы
SystemModel tempModel;
...
for (int taskIndex = 0; taskIndex < NumberTasks; taskIndex++)
{
...
    //Создаем новый объект модели системы
    tempModel = new SystemModel(Data);
    ^^^^^^^^^
    // создаем новую задачу 
    tempTask = new Task<long>(() => { return new OneTask(tempModel, Cluster).FaultsModeling(); });
                                                         ^^^^^^^^^
...
}

У вас типичная проблема с замыканием. В создаваемую задачу передаётся ссылка на внешний объект, который через какое-то время заменяется на другой (ссылка остаётся та же, а объект по этой ссылке уже другой). В итоге получается, что после полного прохождения цикла по задачам, все ваши задачи работают с одной и той же SystemModel - с той, которая была создана на последнем проходе цикла. Таким образом, у вас получается многопоточная работа с одним и тем же AccidentsList внутри одной SystemModel. Вот и получается, что два (или более) потока одновременно проверили, что AccidentsList.Count() не равен нулю и пошли делать AccidentsList.RemoveAt(0);, но если в списке был, к примеру, только один элемент, удалось удаление только тому потоку, который успел раньше, а остальные упадут с ошибкой, что удалять то уже и нечего. Типичная "гонка потоков".

// это условие одновременно проверяют несколько потоков
if (AccidentsList.Count() == 0)
{
    return false;
}
// сюда может попасть несколько потоков
if (RND.NextDouble() > probability)
{
    // элемент в списке один, а потоков сюда зашло много => исключение
    AccidentsList.RemoveAt(0);
}

В общем проблема легко решается созданием нового объекта внутри создаваемой задачи (а не снаружи):

// создаем новую задачу и создаем новый объект модели системы
tempTask = new Task<long>(() => { return new OneTask(new SystemModel(Data), Cluster).FaultsModeling(); });
                                                     ^^^^^^^^^^^^^^^^^^^^^

Я бы вообще создание этого объекта внутрь OneTask перенёс так то.

В общем, после этого изменения мультитаск работает без ошибок.

P.S. Мораль простая. Нужно тщательно следить, чтобы параллельно обрабатывались разные (не одни и те же) данные. А при целенаправленной параллельной обработке одних и тех же разделяемых данных нужно использовать примитивы синхронизации.

→ Ссылка