C# Производительный RateLimiter и возможные deadlocks

Реализовал несложный класс, реализующий концепцию маркерной корзины. Поскольку планируется распространённое применение данного класса в проекте, важно чтобы он был предельно оптимизирован для работы в высоконагруженной среде + были учтены возможные сценарии, при которых возникает deadlock. Какие ваши рекомендации по улучшению этого класса?

public sealed class TokenBucketRateLimiterOptions {

    public TokenBucketRateLimiterOptions() { }

    public int QueueLimit { get; set; }
    public int TokenLimit { get; set; }
    public TimeSpan ReplenishmentInterval { get; set; }

    public TokenBucketRateLimiterOptions Clone() => (TokenBucketRateLimiterOptions)MemberwiseClone();
}

public sealed class TokenBucketRateLimiter : IDisposable {

    private class PermitsRequest : TaskCompletionSource<bool>, IDisposable {

        private bool Disposed = false;
        private CancellationToken CancellationToken;
        private CancellationTokenRegistration TokenRegistration;

        public int PermitCount { get; private set; }

        public PermitsRequest(int permitCount, TokenBucketRateLimiter rateLimiter,
            CancellationToken cancellationToken = default) :
            base(TaskCreationOptions.RunContinuationsAsynchronously) {

            CancellationToken = cancellationToken;
            if (cancellationToken.CanBeCanceled) {
                TokenRegistration = cancellationToken.UnsafeRegister(OnCanceled, rateLimiter);
            }
        }

        private void OnCanceled(object state) {
            if (TrySetCanceled(CancellationToken)) {
                var rateLimiter = state as TokenBucketRateLimiter;
                lock (rateLimiter.SyncLock) rateLimiter.QueuedTokenCount -= PermitCount;
                rateLimiter.HandleQueue();
            }
        }

        public void Dispose() {
            if (Disposed) return;
            Disposed = true;
            TokenRegistration.Dispose();
        }
    }

    private readonly object SyncLock = new object();
    private readonly CancellationTokenSource TimerCts = new();

    private bool Disposed = false;
    private volatile int TokenCount;
    private volatile int QueuedTokenCount;

    private TokenBucketRateLimiterOptions Options;
    private ConcurrentQueue<PermitsRequest> Queue = new();

    private TokenBucketRateLimiter() { }

    public static TokenBucketRateLimiter Create(TokenBucketRateLimiterOptions options) {

        var tokenBucketRateLimiter = new TokenBucketRateLimiter();
        tokenBucketRateLimiter.Options = options.Clone();
        tokenBucketRateLimiter.TokenCount = options.TokenLimit;

        Task.Factory.StartNew(async () => {
            while (!tokenBucketRateLimiter.TimerCts.Token.IsCancellationRequested) {
                await Task.Delay(options.ReplenishmentInterval, tokenBucketRateLimiter.TimerCts.Token);
                tokenBucketRateLimiter.Replenish();
            }
        }, CancellationToken.None,
        TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
        TaskScheduler.Default);

        return tokenBucketRateLimiter;
    }

    public bool Acquire(int permitCount = 1) {

        ThrowIfDisposed();
        if (permitCount > Options.TokenLimit)
            throw new ArgumentOutOfRangeException(nameof(permitCount), "");

        lock (SyncLock) {
            if (TokenCount < permitCount) return false;
            TokenCount -= permitCount;
            return true;
        }
    }

    public ValueTask<bool> AcquireAsync(int permitCount = 1, CancellationToken cancellationToken = default) {

        ThrowIfDisposed();
        if (permitCount > Options.TokenLimit)
            throw new ArgumentOutOfRangeException(nameof(permitCount), "");

        lock (SyncLock) {
            if (TokenCount < permitCount) {
                if (QueuedTokenCount + permitCount > Options.QueueLimit)
                    return new ValueTask<bool>(false);
                var request = new PermitsRequest(permitCount, this, cancellationToken);
                Queue.Enqueue(request);
                QueuedTokenCount += permitCount;
                return new ValueTask<bool>(request.Task);
            }
            TokenCount -= permitCount;
            return new ValueTask<bool>(true);
        }
    }

    public void Replenish() {
        ThrowIfDisposed();
        lock (SyncLock) TokenCount = Options.TokenLimit;
        HandleQueue();
    }

    public void Dispose() {

        if (Disposed) return;
        Disposed = true;

        TimerCts.Cancel();
        var disposing = new SinglyLinkedList<PermitsRequest>();
        lock (SyncLock) {
            while (Queue.TryDequeue(out var request)) {
                request.SetResult(false);
                disposing.Add(request);
            }
        }

        foreach (var request in disposing) {
            request.Dispose();
        }
        TimerCts.Dispose();
    }

    private void HandleQueue() {

        var disposing = new SinglyLinkedList<PermitsRequest>();

        lock (SyncLock) {

            if (TokenCount == 0) return;

            while (Queue.TryPeek(out var request)) {
                if (request.Task.IsCompleted) {
                    Queue.TryDequeue(out _);
                    disposing.Add(request);
                }
                else if (request.PermitCount <= TokenCount) {
                    Queue.TryDequeue(out _);
                    if (request.TrySetResult(true)) {
                        QueuedTokenCount -= request.PermitCount;
                        TokenCount -= request.PermitCount;
                    }
                    disposing.Add(request);
                }
                else {
                    break;
                }
            }
        }

        foreach (var request in disposing) {
            request.Dispose();
        }
    }

    private void ThrowIfDisposed() {
        if (Disposed) {
            throw new ObjectDisposedException(nameof(TokenBucketRateLimiter));
        }
    }
}

UPD: Обновлённая версия с поддержкой нескольких временных интервалов. Конкретно здесь несколько проблем:

  1. Если вызывать Dispose для ненужных PermitsRequestHandleQueue, который как раз вызывается в том числе и при отмене какого либо запроса на токены через CancellationToken), await этих запросов со стороны пользователя будет выбрасывать не OperationCanceledException, а ObjectDisposedException. В общем, не совсем понимаю как правильно здесь утилизировать объекты запросов (или об этом должен заботиться пользователь)...

  2. Повсеместное использование lock'ов - не уверен на сколько это проблема, но если есть способы ускорить этот класс, то было бы хорошо.

  3. Скорее небольшой вопрос, чем проблема - не лучше ли сделать отдельную версию класса для случая с одной корзиной (наиболее распространённый вариант использования) или это не особо повлияет на производительность? Лично я предполагаю, что можно не заморачиваться с этим.

  4. Таймер может срабатывать немного не точно. Как можно было бы нивелировать это для обеспечения максимальной точности? Может быть создать поле, в которое записывать ожидаемое точное время следующего срабатывания таймера, и при срабатывании сравнивать с текущим временем (чтобы скорректировать следующий интервал для таймера)?

  5. Требуется так же встроенная функциональность восстановления состояния. То есть оставалось определённое количество токенов до истечения указанного интервала - после перезагрузки системы оно должно быть восстановлено (при том даже при остановке процессе). Как вариант можно назначать каждому RateLimiter некоторый ID (передавать в конструктор, если пользователю нужна такая фича) и сохранять в реестр условно каждые 10 минут остатки токенов, текущее время и время ожидаемого пополнения. При запуске же всё это проверяется и устанавливается нужное значение (либо полная корзина, либо сохранённое значение остатков токенов из реестра). Может кто то предложит лучшее решение задачи.

    public sealed class MultiIntervalRateLimiterOptions : TokenBucketRateLimiterOptions {

     public struct LimitInfo {
         public int TokenLimit;
         public TimeSpan Interval;
     }
    
     public IEnumerable<LimitInfo> Limits { get; set; }
    

    }

    public sealed class TokenBucketRateLimiter : IDisposable {

     private class Bucket : IDisposable {
    
         private readonly System.Threading.Timer _Timer;
         private readonly int Capacity;
         private readonly TimeSpan ReplenishmentInterval;
    
         private bool Disposed;
         private int TokenCount;
    
         public Bucket(int capacity, TimeSpan? replenishmentInterval, object syncLock) {
    
             if (capacity <= 0)
                 throw new ArgumentOutOfRangeException(nameof(capacity));
             if (syncLock == null)
                 throw new ArgumentNullException(nameof(syncLock));
    
             Capacity = capacity;
             ReplenishmentInterval = replenishmentInterval;
             TokenCount = capacity;
    
             if (replenishmentInterval != null) {
                 _Timer = new System.Threading.Timer(_ => {
                     lock (syncLock) ReplenishSync(); // Синхронизация с ручным вызовом Replenish для TokenBucketRateLimiter
                     ChangeTimer(callbackTime); // Необходимо для поддержки различных сценариев планирования пополнения (в данной версии поддерживает только пополнение через равные временные интервалы)
                 });
                 ChangeTimer(); // Инициируем таймер
             }
         }
    
         // Проверяет можно ли извлечь заданное количество токенов из корзины
         // Необходимо для случая нескольких корзин - сначала проверяем каждую корзину, далее извлекаем токены (либо токены извлекаются из всех корзин в равном количестве, либо ни из одной)
         public bool CanConsumeSync(int permitCount = 1) {
    
             ThrowIfDisposed();
             if (permitCount > Capacity)
                 throw new ArgumentOutOfRangeException(nameof(permitCount), "");
    
             return TokenCount >= permitCount;
         }
    
         // Извлекает токены из корзины
         public bool ConsumeSync(int permitCount = 1) {
    
             ThrowIfDisposed();
             if (permitCount > Capacity)
                 throw new ArgumentOutOfRangeException(nameof(permitCount), "");
    
             if (TokenCount < permitCount) return false;
             TokenCount -= permitCount;
             return true;
         }
    
         // Пополнение корзины
         public void ReplenishSync() {
             ThrowIfDisposed();
             TokenCount = Capacity;
         }
    
         public void Dispose() {
             ThrowIfDisposed();
             Disposed = true;
             _Timer.Dispose();
         }
    
         private void ChangeTimer(DateTime callbackTime) {
             _Timer.Change(ReplenishmentInterval - (DateTime.UtcNow - callbackTime), Timeout.InfiniteTimeSpan);
         }
    
         private void ThrowIfDisposed() {
             if (Disposed) {
                 throw new ObjectDisposedException(nameof(TokenBucketRateLimiter));
             }
         }
     }
    
     private class PermitsRequest : IValueTaskSource<bool>, IDisposable {
    
         private ManualResetValueTaskSourceCore<bool> TaskSourceCore;
         private readonly CancellationToken _CancellationToken;
         private readonly CancellationTokenRegistration TokenRegistration;
    
         private bool Disposed;
    
         public int PermitCount { get; private set; }
    
         public short Version => TaskSourceCore.Version;
    
         public PermitsRequest(int permitCount, TokenBucketRateLimiter rateLimiter, CancellationToken cancellationToken = default) {
    
             TaskSourceCore = new ManualResetValueTaskSourceCore<bool> {
                 RunContinuationsAsynchronously = true
             };
             _CancellationToken = cancellationToken;
             PermitCount = permitCount;
    
             if (cancellationToken.CanBeCanceled) {
                 TokenRegistration = cancellationToken.UnsafeRegister(OnCanceled, rateLimiter);
             }
         }
    
         private void OnCanceled(object state) {
    
             var rateLimiter = state as TokenBucketRateLimiter;
             lock (rateLimiter.SyncLock) rateLimiter.QueuedTokenCount -= PermitCount;
    
             TaskSourceCore.SetException(new OperationCanceledException(_CancellationToken));
    
             Debug.WriteLine("Check Status: "+TaskSourceCore.GetStatus(Version)); // ? Pending
             rateLimiter.HandleQueue();
         }
    
         public void Complete(bool result) {
             ThrowIfDisposed();
             TaskSourceCore.SetResult(result);
         }
    
         public bool GetResult(short token) {
             ThrowIfDisposed();
             return TaskSourceCore.GetResult(token);
         }
    
         public ValueTaskSourceStatus GetStatus(short token) {
             ThrowIfDisposed();
             return TaskSourceCore.GetStatus(token);
         }
    
         public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) {
             ThrowIfDisposed();
             TaskSourceCore.OnCompleted(continuation, state, token, flags);
         }
    
         public void Dispose() {
             ThrowIfDisposed();
             Disposed = true;
             TokenRegistration.Unregister();
         }
    
         private void ThrowIfDisposed() {
             if (Disposed) {
                 throw new ObjectDisposedException(nameof(TokenBucketRateLimiter));
             }
         }
     }
    
     private readonly object SyncLock = new object();
    
     private bool Disposed;
     private ReadOnlyCollection<Bucket> Buckets;
     private int QueuedTokenCount; // Количество токенов для всех запросов в очереди (необходимо для ограничения размера очереди)
    
     private TokenBucketRateLimiterOptions Options;
     private Queue<PermitsRequest> Queue = new(); // Очередь запросов на токены
    
     private TokenBucketRateLimiter() { }
    
     public static TokenBucketRateLimiter Create(TokenBucketRateLimiterOptions options) {
    
         var tokenBucketRateLimiter = new TokenBucketRateLimiter() {
             Options = options.Clone()
         };
    
         var buckets = new List<Bucket>();
         if (options is MultiIntervalRateLimiterOptions multiIntervalOptions) {
             foreach (var limit in multiIntervalOptions.Limits) {
                 buckets.Add(new Bucket(limit.TokenLimit, limit.Interval,
                     tokenBucketRateLimiter.SyncLock));
             }
         }
         tokenBucketRateLimiter.Buckets = new ReadOnlyCollection<Bucket>(buckets);
    
         return tokenBucketRateLimiter;
     }
    
     // Запрашивает заданное количество токенов
     public bool Acquire(int permitCount = 1) {
    
         ThrowIfDisposed();
    
         lock (SyncLock) {
             foreach (var bucket in Buckets) {
                 if (!bucket.CanConsumeSync(permitCount))
                     return false;
             }
             foreach (var bucket in Buckets) {
                 bucket.ConsumeSync(permitCount);
             }
             return true;
         }
     }
    
     // Запрашивает заданное количество токенов асинхронно
     // Если токенов в корзине нет, создаёт запрос
     public ValueTask<bool> AcquireAsync(int permitCount = 1, CancellationToken cancellationToken = default) {
    
         ThrowIfDisposed();
    
         lock (SyncLock) {
             if (Acquire(permitCount))
                 return new ValueTask<bool>(true);
    
             // Не можем встать в очередь
             if (QueuedTokenCount + permitCount > Options.QueueLimit)
                 return new ValueTask<bool>(false);
    
             var request = new PermitsRequest(permitCount, this, cancellationToken);
             Queue.Enqueue(request); // Встаём в очередь
             QueuedTokenCount += permitCount; // Обновляем количество токенов в очереди
             return new ValueTask<bool>(request, request.Version);
         }
     }
    
     // Пополняет все корзины
     public void Replenish() {
         ThrowIfDisposed();
         lock (SyncLock) {
             foreach (var bucket in Buckets)
                 bucket.ReplenishSync();
         }
         HandleQueue();
     }
    
     public void Dispose() {
    
         if (Disposed) return;
         Disposed = true;
    
         // Завершаем и уничтожаем все запросы запросы в очереди
         lock (SyncLock) {
             while (Queue.TryDequeue(out var request)) {
                 try { request.Complete(false); } catch (InvalidOperationException) { }
                 request.Dispose();
             }
         }
    
         foreach (var bucket in Buckets) {
             bucket.Dispose();
         }
     }
    
     // Обработка очереди запросов на токены
     private void HandleQueue() {
    
         lock (SyncLock) {
             // Обрабатываем запросы строго в порядке их поступления в очередь. Запрос может быть уже завершён пользователем - пропуск, либо на него по прежнему нет токенов - завершение метода, либо токены есть - завершение запроса
             while (Queue.TryPeek(out var request)) {
    
                 if (request.GetStatus(request.Version) != ValueTaskSourceStatus.Pending) {
                     Queue.TryDequeue(out _);
                     request.Dispose();
                     continue;
                 }
    
                 bool _break = false;
                 foreach (var bucket in Buckets) {
                     if (_break |= !bucket.CanConsumeSync(request.PermitCount))
                         break;
                 }
                 if (_break) break;
    
                 Queue.TryDequeue(out _);
    
                 try {
                     request.Complete(true);
                     foreach (var bucket in Buckets) {
                         bucket.ConsumeSync(request.PermitCount);
                     }
                     QueuedTokenCount -= request.PermitCount;
                 }
                 catch (InvalidOperationException) { }
    
                 request.Dispose();
             }
         }
     }
    
     private void ThrowIfDisposed() {
         if (Disposed) {
             throw new ObjectDisposedException(nameof(TokenBucketRateLimiter));
         }
     }
    

Ответы (1 шт):

Автор решения: Pavel Mayorov

Если вам нужен и правда предельно оптимизированный код, то вы просто обязаны обратить внимание на следующее:

  1. Ваш PermitsRequest создаёт аж три объекта - сам PermitsRequest, задачу и регистрацию токена отмены (она выглядит структурой, но внутри всё равно объект). Если с последней ничего не поделать, то PermitsRequest можно положить в пул, а чтобы не создавать задачу - вместо TaskCompletionSource надо использовать IValueTaskSource

  2. Вы используете volatile свойства под блокировкой. В этом нет смысла - блокировка сама по себе ставит все нужные барьеры для доступа к разделяемым данным, volatile тут лишь замедляет.

  3. Ожидание в цикле также создаёт по задаче на каждую итерацию. Можно вместо этого использовать System.Threading.Timer

  4. Вы используете односвязный список для освобождения PermitsRequest, что также делает лишние аллокации. Их можно избежать если использовать интрузивный список (т.е. список, сформированный полями самого PermitsRequest, а не внешним классом). Но ещё лучше - просто вызывать у регистрации Unregister вместо Dispose, тогда и список не понадобится.

→ Ссылка