Поддержание выполнения определенного числа задач
У меня выполняется статистическое моделирование и для ускорения процесса на многоядерном процессоре хочется запускать его параллельными задачами.
У меня есть вот такой код:
/// <summary>
/// Моделирование по исходным данным на заданное количество реализаций
/// </summary>
/// <param name="Data">Исходные данные</param>
/// <param name="NumberTasks">Число потоков</param>
/// <param name="Cluster">Число реализаций на поток</param>
/// <param name="NumberModeling">Необходимое число реализаций</param>
/// <param name="token">Токен отмены моделирования</param>
/// <param name="Progress">Интерфейс прогресса для отображения в интерфейсе</param>
/// <returns></returns>
public static ModelingResult Modeling(StaticSystem Data, long Cluster, long NumberModeling, CancellationToken token, IProgress<double> Progress)
{
//Создаем новый таймер
Stopwatch Timer = new Stopwatch();
//запускаем таймер
Timer.Start();
//Создаем объект результат моделирования
ModelingResult result = new ModelingResult();
//сохраняем текущую дату
DateTime Date = DateTime.Now;
//Записываем дату в результат
result.Date=Date;
int NumberTasks = Data.Properties.NumberTasks;
//создаем массив моделей системы
DynamicSystem[] SystemModels = Factory.ProduceModels(Data, NumberTasks);
//создаем список задач
List<Task<long>> ModelingTasks = new List<Task<long>>();
OneTask[] TasksObjects = new OneTask[NumberTasks];
//определяем необходимое число выполненных задач
long TotalTasks = (long)Math.Floor((double)(NumberModeling / Cluster));
//счетчик выполненных задач
long NumberFinishedTasks = 0;
//счетчик отказов
long NumberFaults = 0;
//номер текущей задачи
int taskIndex;
//устанавливаем номер текущей задачи
int currentTaskIndex = NumberTasks;
// заполняем массив задач и запускаем их
long currentResult=0;
//Временная переменная задачи
Task<long> tempTask;
for (taskIndex = 0; taskIndex < NumberTasks; taskIndex++)
{
//для каждого потока создаем отдельный объект
TasksObjects[taskIndex] = new OneTask(SystemModels[taskIndex], Data.Properties, Cluster);
//создаем новую задачу по объекту с нужным индексом
tempTask = new Task<long>(() => { return TasksObjects[taskIndex].Modeling(); });
//добавляем новую задачу в список
ModelingTasks.Add(tempTask);
//Запускаем задачу
tempTask.Start();
if (ModelingTasks.IndexOf(tempTask) != taskIndex)
{
throw new Exception(Glossary.Messages["UnknownError"]);
}
}
taskIndex = 0;
//выполняем цикл, пока не достигнем нужного числа выполненных задач
while (NumberFinishedTasks < TotalTasks)
{
// Если поступил запрос на отмену моделирования
if (token.IsCancellationRequested)
{
//возвращаем пустой результат
return null;
}
//если номер выполненной задачи входит в нужный интервал
if (currentTaskIndex < NumberTasks)
{
// создаем новую задачу с теми же параметрами
tempTask = new Task<long>(() => { return TasksObjects[currentTaskIndex].Modeling(); });
//Вставляем новую задачу в список по нужному индексу
ModelingTasks.Insert(currentTaskIndex,tempTask);
//Запускаем новую задачу
tempTask.Start();
if (ModelingTasks.IndexOf(tempTask) != currentTaskIndex)
{
throw new Exception(Glossary.Messages["UnknownError"]);
}
}
//ждем, пока выполнится одна из задач
Task<long> completedTask = (Task.WhenAny(ModelingTasks)).Result;
// и получаем ее индекс
currentTaskIndex = ModelingTasks.IndexOf(completedTask);
//Получаем результат выполненной задачи
currentResult = completedTask.Result;
if (!ModelingTasks.Contains(completedTask))
{
throw new Exception(Glossary.Messages["UnknownError"]);
}
//удаляем задачу из списка
ModelingTasks.Remove(completedTask);
//прибавляем число отказов в этой задаче к общему числу ошибок
NumberFaults += currentResult;
//увеличиваем счетчик выполненных задач
NumberFinishedTasks++;
double percent = (double)NumberFinishedTasks * (double)Cluster / (double)NumberModeling;
Progress.Report(percent);
completedTask.Dispose();
}
long TotalModeling = TotalTasks * Cluster; //общее число реализаций моделирования
double probability, lowProbability, highProbability;
probability = 1 - ((double)NumberFaults / (double)TotalModeling); //вычисляем точечное значение показателя
result.Probability = probability; //записываем его в объект результата
Basic.CalculateConfidenceBoards(probability, Data.Properties.ConfidenceProbability, TotalModeling, out lowProbability, out highProbability); //вычисляем доверительные границы показателя
result.LowBoard = lowProbability; //записываем нижнюю границу
result.HighBoard = highProbability; //записываем верхнюю границу
Timer.Stop(); //останавливаем таймер
result.TotalTimeOfModeling = (Timer.ElapsedMilliseconds)/1000.0; //записываем время выполнения в результаты
return result; //возвращаем результат
Проблема в том, что пока я его запускаю с одним потоком (NumberTasks), всё прекрасно работает. Если же запускаю больше одного потока, то постоянно вываливаются разные ошибки, в основном связанные с тем, что не находит нужного индекса в списках уже в реализации модели. При этом при малом количестве реализаций (NumberModeling) бывает и проскакивает. С чем это может быть связано?
Ответы (1 шт):
В вашем случае нужно весьма долго изучать что такое потоки, и как это работает. Причём, нужно основательно в этом разобраться, потому что всякое "производное" - следует строго из базовых вещей. Всё всегда построено на "базе", а производное - всегда суть обёртки, синтаксический сахар, и так далее. Краткая суть здесь всегда следующая.
Существует метод, метод исполняется на стеке, т.е. стек - существует. Но это не точно, и это уже проблема.
Существует куча, там экземпляры объектов лежат. Куча существует всегда.
Стеков может быть несколько, это очень важное понимание. Именно под несколько потоков создаются "свои" стеки. Но тут и есть проблема. Никогда не известно - а существует ли тот или иной стек. Иначе говоря вы запускаете три потока, это три стека. Но когда они создаются и создаются ли - это не известно. И когда завершаются, и завершаются ли - это тоже не известно.
Из пункта 3 следует весьма не банальность - никогда нельзя пересекать стеки, потому что сам смысл пересечения стеков - не определён, потому что не известно, а существует ли тот или иной стек?
Как быть? Хранить результаты работы стеков в/на куче, ибо куча всегда существует.
Простейший код объясняющий работу многопотока. Вы пишете у вас есть объект, у объекта есть метод, но опять же вы пишете, что объекты могут быть для потоков свои, и это весьма правильно.
Создадим класс объекта, не экземпляр, но класс. Где простейшее поле, и метод увеличивающий это поле.
public class MyObject
{
public long Field;
public void DoMethod()
{
Field += 1;
}
}
Далее как обычно пусть консоль, там Program и Main() как точка входа. Теперь нужны пусть 4 экземпляра класса MyObject, и 4 потока на каждый экземпляр. Естественно нужно замерить время работы. И 4 метода для каждого потока, поток - это метод, а метод - это стек для метода, и эти методы т.е. стеки - не пересекаются, т.е. методы в своих стеках, а результаты работы методов пишутся в кучу, в свои собственные поля Field каждого экземпляра MyObject.
Для работы System.Diagnostics.Stopwatch нужно подцепить сборку System
public static class Program
{
public static MyObject obj0, obj1, obj2, obj3;
public static void Main()
{
System.Console.WriteLine("Any to start");
System.Console.ReadLine();
// Это дополнительный "код", он "барабанит" ядро процессора
// просто так, тем самым повышая частоту проца до максимальной
// Такой код обычно называется "прогревашкой"
long tmp = 0;
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
for (long i = 0; i < 4000000000; i++) tmp += 1;
sw.Stop();
System.Console.WriteLine("Progrevashka time {0}ms", sw.ElapsedMilliseconds);
// Начало собственно кода здесь, создаём экземпляры в/на куче
// для каждого потока свой Object
obj0 = new MyObject();
obj1 = new MyObject();
obj2 = new MyObject();
obj3 = new MyObject();
// Создаём потоки, указываем методы
// Методы для каждого потока свои, это четыре стека
System.Threading.Thread
t0 = new System.Threading.Thread(ThreadMethod0),
t1 = new System.Threading.Thread(ThreadMethod1),
t2 = new System.Threading.Thread(ThreadMethod2),
t3 = new System.Threading.Thread(ThreadMethod3);
// Запускаем потоки, создаются стеки, и на стеках работают методы
t0.Start();
t1.Start();
t2.Start();
t3.Start();
// Вот здесь всё сложнее, каждый поток возвращает результат потока
// обычно это ноль, если всё идёт "хорошо", но если что то идёт
// не так, но это не ноль, и строго говоря нужно проверять
// чем завершился поток
t0.Join();
t1.Join();
t2.Join();
t3.Join();
// Все потоки завершили работу, суммируем поля объектов
// которые на куче и всегда существуют
System.Console.WriteLine(
obj0.Field +
obj1.Field +
obj2.Field +
obj3.Field);
System.Console.ReadLine();
}
// Методы для потоков, где каждый метод работает со своим объектом
// и метод никогда не лезет в другой метод другого потока, т.е. стека
// ибо не известно, а существует ли другой стек
public static void ThreadMethod0()
{
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
for (long i = 0; i < 1000000000; i++)
{
obj0.DoMethod();
}
sw.Stop();
System.Console.WriteLine("Thread0 time {0}ms", sw.ElapsedMilliseconds);
}
// Ещё метод 1
public static void ThreadMethod1()
{
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
for (long i = 0; i < 1000000000; i++)
{
obj1.DoMethod();
}
sw.Stop();
System.Console.WriteLine("Thread1 time {0}ms", sw.ElapsedMilliseconds);
}
// Ещё метод 2
public static void ThreadMethod2()
{
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
for (long i = 0; i < 1000000000; i++)
{
obj2.DoMethod();
}
sw.Stop();
System.Console.WriteLine("Thread2 time {0}ms", sw.ElapsedMilliseconds);
}
// Ещё метод 3
public static void ThreadMethod3()
{
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
for (long i = 0; i < 1000000000; i++)
{
obj3.DoMethod();
}
sw.Stop();
System.Console.WriteLine("Thread3 time {0}ms", sw.ElapsedMilliseconds);
}
}
Результаты на древней предельно стабильной тестовой станции
i5-3570k all 4 cores locked, Z77, no-HT, Framework 4.8.2 x64/release
Any to start
Progrevashka time 1487ms
Thread0 time 2269ms
Thread2 time 2281ms
Thread1 time 2300ms
Thread3 time 2337ms
4000000000
Если в 1 поток, результат в районе 6600ms
Суть тут следующая, многопоток - это одна из сложнейших тем, и разбираться нужно с самого начала. Потому что как было написано выше, без понимания базовых вещей лезть в ещё более сложные производные вещи от базовых - бессмысленно.
Базовое программирование многопотока - это свой стек/метод/объект на каждый поток. И синхронизация результатов строго в конце, это база многопотока. Пожалуйста, разберитесь в простейшем коде, и почитайте соответствующую литературу, и уже далее действительно можно [наверное] пробовать писать сложные вещи, а вещи здесь действительно - весьма сложные.
Синхронное/асинхронное программирование ещё сложнее устроено.
Пс: Не для автора вопроса. Постоянное пересоздание стека, т.е. не переиспользование стека - это вопрос философии. Потоки создаются что бы поработать и самозакрыться. А создать новый поток - не проблема. Для простейших проектов и простейшего распараллеливания - это очень простой и надёжный путь, создавать потоки под задачи, и не держать потоки на сложной синхронизации. Опять же по замерам миллион потоков [т.е. миллион стеков] с простейшей операцией на потоке/стеке создаются и прибиваются миллион штук в секунду, протестировано.