Поддержание количества выполняемых параллельно задач
В продолжение вопроса Поддержание выполнения определенного числа задач Грубо говоря мне надо выполнить 10^9 раз метод на объекте. Для экономии времени на многоядерном процессоре, я хотел запускать метод на на нескольких копиях объекта, причем относительно небольшими порциями, чтобы количество выполняемых методов оставалось всегда одинаковым. Я набросал простой пример, логика выполнения и взаимодействия в ообщем-то соблюдена.
Класс данных - в нем только три поля и конструктор
/// <summary>
/// Класс данных
/// </summary>
public class SystemData
{
/// <summary>
/// Число элементов
/// </summary>
public int Number;
/// <summary>
/// Число запасных
/// </summary>
public int SpareNumber;
/// <summary>
/// Вероятность
/// </summary>
public double probability;
/// <summary>
/// Конструктор по умолчанию
/// </summary>
public SystemData ()
{
Number=1;
SpareNumber=0;
probability=1.0;
}
/// <summary>
/// Конструктор с всеми параметрами
/// </summary>
/// <param name="Number">Число элементов</param>
/// <param name="SpareNumber">Число запасных</param>
/// <param name="probability">Вероятность</param>
public SystemData(int Number, int SpareNumber, double probability)
{
this.Number = Number;
this.SpareNumber = SpareNumber;
this.probability = probability;
}
Класс модели - в нем те же три поля, необходимый для моделирования список, генератор случайных чисел, и единственный метод, выдающий bool.
/// <summary>
/// Класс модели
/// </summary>
public class SystemModel
{
/// <summary>
/// Число элементов
/// </summary>
public int Number;
/// <summary>
/// Число запасных
/// </summary>
public int SpareNumber;
/// <summary>
/// Вероятность
/// </summary>
public double probability;
/// <summary>
/// Список инцидентов
/// </summary>
private List<double> AccidentsList;
/// <summary>
/// Генератор случайных чисел
/// </summary>
private Random RND;
/// <summary>
/// Конструктор по объекту данных
/// </summary>
/// <param name="Data"></param>
public SystemModel(SystemData Data)
{
Number = Data.Number;
SpareNumber = Data.SpareNumber;
probability = Data.probability;
AccidentsList = new List<double>();
RND = new Random();
for (int index=0; index<SpareNumber;index++)
{
AccidentsList.Add(RND.NextDouble());
}
}
/// <summary>
/// Метод однократного моделирования
/// </summary>
/// <returns>ложь, если запасных не хватило</returns>
public bool Modeling()
{
for (int index = 0; index < Number; index++)
{
if (AccidentsList.Count() == 0)
{
return false;
}
if (RND.NextDouble() > probability)
{
AccidentsList.RemoveAt(0);
}
}
return true;
}
}
Класс для моделирования заданного числа реализаций - два поля и единственный метод, выдающий число полученных false.
/// <summary>
/// Модель на которой выполняется моделирование
/// </summary>
private SystemModel Model;
/// <summary>
/// Количество выполняемых реализаций
/// </summary>
private long Cluster;
/// <summary>
/// конструктор со всеми параметрами
/// </summary>
public OneTask(SystemModel Model, long Cluster)
{
this.Model = Model;
this.Cluster = Cluster;
}
/// <summary>
/// Метод моделирует заданное число реализаций
/// </summary>
/// <returns>количество отказов</returns>
internal long FaultsModeling()
{
long NumberFaults = 0;
for (long IndexModeling = 0; IndexModeling < Cluster; IndexModeling++)
{
if (!Model.Modeling())
{
NumberFaults++;
}
}
return NumberFaults;
}
Класс моделирования в нескольких "потоках"
/// <summary>
/// Класс для моделирования в заданном числе потоков
/// </summary>
public class MultiTask
{
/// <summary>
/// объект данных
/// </summary>
private SystemData Data;
/// <summary>
/// Число выполняемых реализаций
/// </summary>
private long NumberModeling;
/// <summary>
/// Число реализаций выполняемых в одном потоке
/// </summary>
private long Cluster;
/// <summary>
/// Число параллельных потоков
/// </summary>
private int NumberTasks;
/// <summary>
/// Конструктор со всеми параметрами
/// </summary>
/// <param name="Data"></param>
/// <param name="NumberModeling"></param>
/// <param name="Cluster"></param>
/// <param name="NumberTasks"></param>
public MultiTask(SystemData Data, long NumberModeling, long Cluster, int NumberTasks)
{
this.Data = Data;
this.NumberModeling = NumberModeling;
this.Cluster = Cluster;
this.NumberTasks = NumberTasks;
}
public double GetResult()
{
//Общее число отказов
long TotalFaults = 0;
//создаем список задач
List<Task<long>> ModelingTasks = new List<Task<long>>();
//определяем необходимое число выполненных задач
long TotalTasks = (long)Math.Floor((double)(NumberModeling / Cluster));
//счетчик выполненных задач
long NumberFinishedTasks = 0;
//текущий результат
long currentResult;
//временная модель системы
SystemModel tempModel;
//Временная переменная задачи
Task<long> tempTask;
//Заполняем список задач
for (int taskIndex = 0; taskIndex < NumberTasks; taskIndex++)
{
//Создаем новый объект модели системы
tempModel = new SystemModel(Data);
//создаем новую задачу
tempTask = new Task<long>(() => { return new OneTask(tempModel, Cluster).FaultsModeling(); });
//добавляем новую задачу в список
ModelingTasks.Add(tempTask);
//Запускаем задачу
tempTask.Start();
}
//выполняем цикл, пока не достигнем нужного числа выполненных задач
while (NumberFinishedTasks < TotalTasks)
{
//ждем, пока выполнится одна из задач
Task<long> completedTask = (Task.WhenAny(ModelingTasks)).Result;
//Получаем результат выполненной задачи
currentResult = completedTask.Result;
//удаляем задачу из списка
ModelingTasks.Remove(completedTask);
completedTask.Dispose();
//прибавляем число отказов в этой задаче к общему числу ошибок
TotalFaults += currentResult;
//увеличиваем счетчик выполненных задач
NumberFinishedTasks++;
//Создаем новый объект модели системы
tempModel = new SystemModel(Data);
// создаем новую задачу
tempTask = new Task<long>(() => { return new OneTask(tempModel, Cluster).FaultsModeling(); });
//Вставляем новую задачу в список
ModelingTasks.Add(tempTask);
//Запускаем новую задачу
tempTask.Start();
}
//общее число реализаций моделирования
long TotalModeling = TotalTasks * Cluster;
//Вычисляем итоговую вероятность
double result =1- (double)TotalFaults / (double)TotalModeling;
//возвращаем результат
return result;
}
}
Всё это просто проверил в консоли
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Введите число элементов");
int Number = int.Parse(Console.ReadLine());
Console.WriteLine("Введите число запасных");
int SpareNumber = int.Parse(Console.ReadLine());
Console.WriteLine("Введите вероятность работы");
double probality = double.Parse(Console.ReadLine());
SystemData Data = new SystemData(Number, SpareNumber, probality);
Console.WriteLine("Введите необходимое число моделирований");
long NumberModeling = long.Parse(Console.ReadLine());
Console.WriteLine("Введите число моделирований в одном кластере");
long Cluster = long.Parse(Console.ReadLine());
Console.WriteLine("Введите число параллельных потоков");
int NumberTasks = int.Parse(Console.ReadLine());
MultiTask MultiModel = new MultiTask(Data, NumberModeling, Cluster, NumberTasks);
double result = MultiModel.GetResult();
Console.WriteLine("Получен результат: {0}", result);
}
}
Тестовый пример 100 - 12 -0,9. 10^8 реализаций на одном потоке выполняются без проблем. Но если сделать 10 потоков и 10^6 реализаций, то появляется ошибка, связанная со списком AccidentsList из класса SystemModel (Индекс за пределами диапазона. Индекс должен быть положительным числом, а его размер не должен превышать размер коллекции.). Хотя ровно перед удалением элемента из списка происходит проверка, что список не пустой. И список создается заново в конструкторе, у каждого объекта он свой. Откуда может быть такая особенность? Что я делаю не так? Или это Task.WhenAny так странно работает?
Еще раз спасибо @EvgeniyZ за критику и предложения по моему вопросу, постарался учесть.
Ответы (1 шт):
//временная модель системы
SystemModel tempModel;
...
for (int taskIndex = 0; taskIndex < NumberTasks; taskIndex++)
{
...
//Создаем новый объект модели системы
tempModel = new SystemModel(Data);
^^^^^^^^^
// создаем новую задачу
tempTask = new Task<long>(() => { return new OneTask(tempModel, Cluster).FaultsModeling(); });
^^^^^^^^^
...
}
У вас типичная проблема с замыканием. В создаваемую задачу передаётся ссылка на внешний объект, который через какое-то время заменяется на другой (ссылка остаётся та же, а объект по этой ссылке уже другой). В итоге получается, что после полного прохождения цикла по задачам, все ваши задачи работают с одной и той же SystemModel
- с той, которая была создана на последнем проходе цикла. Таким образом, у вас получается многопоточная работа с одним и тем же AccidentsList
внутри одной SystemModel
. Вот и получается, что два (или более) потока одновременно проверили, что AccidentsList.Count()
не равен нулю и пошли делать AccidentsList.RemoveAt(0);
, но если в списке был, к примеру, только один элемент, удалось удаление только тому потоку, который успел раньше, а остальные упадут с ошибкой, что удалять то уже и нечего. Типичная "гонка потоков".
// это условие одновременно проверяют несколько потоков
if (AccidentsList.Count() == 0)
{
return false;
}
// сюда может попасть несколько потоков
if (RND.NextDouble() > probability)
{
// элемент в списке один, а потоков сюда зашло много => исключение
AccidentsList.RemoveAt(0);
}
В общем проблема легко решается созданием нового объекта внутри создаваемой задачи (а не снаружи):
// создаем новую задачу и создаем новый объект модели системы
tempTask = new Task<long>(() => { return new OneTask(new SystemModel(Data), Cluster).FaultsModeling(); });
^^^^^^^^^^^^^^^^^^^^^
Я бы вообще создание этого объекта внутрь OneTask
перенёс так то.
В общем, после этого изменения мультитаск работает без ошибок.
P.S. Мораль простая. Нужно тщательно следить, чтобы параллельно обрабатывались разные (не одни и те же) данные. А при целенаправленной параллельной обработке одних и тех же разделяемых данных нужно использовать примитивы синхронизации.