Как "схлопнуть" вызовы, чтобы вызываемый код не "захлебнулся"?
Может есть какой-то готовый код, чтобы не изобретать велосипеды.
Сценарий такой. В коде генерируются некие обновления, которые затем обрабатываются неким методом. Обновления могут генерироваться слишком часто, плюс метод обработки может тормозить, в результате обновления не успевают обрабатываться, накапливаются, обработчик потом делает лишнюю работу. Хотелось бы как-то "накопить" обновления, пока обработчик занят. Причём, сами данные для обновления уже "накапливаются", об этом заботиться не нужно, нужно только разрулить "накопление" запросов к обработчику, чтобы когда обработчик освободиться обрабатывать не все накопившиеся запросы на обновление по отдельности, а за один раз сразу весь пакет "обновлений" обработать.
Как сейчас:
async Task UpdateFoo()
{
...
await SaveAsync();
}
async Task UpdateBar()
{
...
await SaveAsync();
}
async Task UpdateBaz()
{
...
await SaveAsync();
}
async Task SaveAsync()
{
// здесь нужно сделать так,
// чтобы одновременно обрабатывался только один запрос
// по окончании которого проверялось бы не было ли ещё запросов
// и если были, то обновление запускалось бы ещё один раз (сразу за всё "накопленное")
// и так пока есть обновления по окончании очередной обработки
}
Наверняка логика должна быть какая-то простая довольно, и сценарий довольно типовой, но не соображу, как нормально написать. Для того, чтобы зайти один раз - понятно, SemaphoreSlim
, видимо (с WaitAsync
). А вот остальное как лучше сделать?
Ответы (3 шт):
Общий пример, берём System.Threading.Channels.Channel<T>
.
private readonly Channel<T> _channel = Channel<T>.CreateUnbounded();
Пишем метод разрегребания
private Task _workerTask;
private async Task WorkerAsync()
{
ChannelReader<T> reader = _channel.Reader;
List<T> list = new();
// ждём здесь, если в канале пусто
while (await reader.WaitToReadAsync())
{
// забираем всё что есть
// ну или можно счётчиком ограничить максимальное количество выгребаемых данных за раз
while (reader.TryRead(out T data))
{
list.Add(data);
}
// пачка собрана, погнали. это можно в try-catch завернуть, чтобы воркер не падал
await UseAsync(list);
list.Clear();
}
}
Стартуем воркер
_workerTask = WorkerAsync();
Чтобы закинуть в канал
_channel.Writer.TryWrite(x);
Чтобы закрыть канал (насовсем), и чтобы метод WorkerAsync
завершился, нужно вызвать
_channel.Writer.Complete();
await _workerTask;
Вот и вся магия.
Вот код на основании вашего самоответа, с точно такой же логикой
private readonly Channel<T> _channel = Channel<T>.CreateBounded(1); // максимум один в очереди
private async Task WorkerAsync()
{
await foreach (var data in _channel.Reader.ReadAllAsync())
{
try
{
await _producer.SendAsync(data);
}
catch (Exception ex)
{
_logger.Error(ex, "Ошибка отсылки обновлений");
}
}
}
/// <summary>
/// Отсылка с ограничениями: один поток отсылает, ещё один может ждать в очереди, остальные просто возвращаются
/// </summary>
/// <returns></returns>
private void SendUpdatedData()
{
if (!channel.Writer.TryWrite(GetData())
{
_logger.Debug("Уже есть ожидание отсылки обновления...");
}
}
Здесь только консьюмера запустить один раз на всё вермя работы приложения, а как его остановить, показано выше - _channel.Writer.Complete()
Task task = WorkerAsync(); // ну или `_ = ...`
Смысл этого таска для того что если у вас приложения завершается, вы например можете вызвать Complete()
и через await task;
дождаться когда все запросы воркером обработаются, после чего нормально завершить работу. А в предложенном вами решении такой возможности не предусмотрено.
Если все вызовы SaveAsync идут из одного потока, то можно реализовать так:
Помещаем в очередь изменение
Если работа по сохранению уже в процессе, ждём её завершения
Если очередь не пуста
- забираем всё из неё
- свою задачу сохраняем как ожидающую завершения
Иначе
- ждём завершения сохранения
Если вызовы могут быть многопоточными, то надо офигенно аккуратно расставить локи так, чтобы они покрывали все дайствия, но не время ожидания.
Что-то такое (но ничего не гарантирую, и вообще, писал прямо тут):
object lobj = new();
List<...> changes = new();
Task active = Task.FromResult();
async Task SaveAsync()
{
Task cur;
lock (lobj)
{
changes.Add(...);
cur = active;
}
await cur;
lock (lobj)
{
if (changes.Count !== 0)
{
active = cur = SaveAsyncReal(changes.ToList());
changes.Clear();
}
else
cur = active;
}
await cur;
}
async Task SaveAsyncReal(List<...> diff)
{
...
}
Но если сюда ещё и обработку ошибок надо присобачить, то это вообще долго думать надо...
Сам пока сделал на таске и двух семафорах, хотя другие решения тоже интересные, но не совсем подходят под мою задачу. Суть в том, что одно обновление работает, другое ожидает в очереди, все остальные обновления отбрасываются - они излишни, второго ожидающего обновления достаточно.
private void SendUpdatedData()
{
_ = Task.Run(async () => { await SendAsync(); });
}
/// <summary>
/// Отсылка с ограничениями: один поток отсылает, ещё один может ждать в очереди, остальные просто возвращаются
/// </summary>
/// <returns></returns>
private async Task SendAsync()
{
if (_semaphore2.CurrentCount == 0)
{
_logger.Debug("Уже есть ожидание отсылки обновления...");
return;
}
await _semaphore2.WaitAsync();
try
{
await _semaphore1.WaitAsync();
try
{
var data = GetData();
await _producer.SendAsync(data);
}
finally
{
_semaphore1.Release();
}
}
catch (Exception ex)
{
_logger.Error(ex, "Ошибка отсылки обновлений");
}
finally
{
_semaphore2.Release();
}
}
Покритикуйте, пожалуйста. )