C# Потокобезопасный Dictionary с асинхронным ожиданием изменений
Задача - создать потокобезопасный словарь, позволяющий асинхронно ожидать обновление значений по ключу. При том, если в значениях используется специальная коллекция, отслеживаются изменения внутри самой коллекции. Если значение было добавлено или обновлено до того, как пользователь вызывал метод ожидания обновлений, возврат происходит незамедлительно. Каким образом можно оптимизировать данный словарь для работы в высоконагруженной среде? Буду рад любым предложениям по улучшению (в том числе и не касающихся улучшений производительности).
public enum CollectionChangedAction {
ItemAdded,
ItemRemoved,
ItemUpdated,
Reset
}
public class CollectionChangedEventArgs : EventArgs {
public CollectionChangedAction Action { get; private set; }
public CollectionChangedEventArgs(CollectionChangedAction action) => Action = action;
}
public interface IObservableCollection {
public event EventHandler<CollectionChangedEventArgs> CollectionChanged;
}
public class AsyncDictionary<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>> {
private class DefaultEqualityComparer<T> : IEqualityComparer<T> {
public static DefaultEqualityComparer<T> Instance { get; } = new DefaultEqualityComparer<T>();
private DefaultEqualityComparer() { }
public bool Equals(T x, T y) {
if (default(T) == null) return ReferenceEquals(x, y);
return EqualityComparer<T>.Default.Equals(x, y);
}
public int GetHashCode(T obj) {
if (default(T) == null)
return obj != null ? RuntimeHelpers.GetHashCode(obj) : 0;
return EqualityComparer<T>.Default.GetHashCode(obj);
}
}
/// <summary>
/// Управляет подпиской на события изменения коллекции.
/// </summary>
private class CollectionChangedHandlerController {
private EventHandler<CollectionChangedEventArgs> EventHandler;
/// <summary>
/// Подключенная наблюдаемая коллекция.
/// </summary>
public IObservableCollection Collection { get; private set; }
public CollectionChangedHandlerController(IObservableCollection collection) {
Collection = collection;
}
public void AddHandler(EventHandler<CollectionChangedEventArgs> eventHandler) {
if (eventHandler == null)
throw new ArgumentNullException(nameof(eventHandler));
if (EventHandler != null)
throw new InvalidOperationException("Обработчик уже зарегистрирован для данного события.");
if (Collection != null) {
EventHandler = eventHandler;
Collection.CollectionChanged += eventHandler;
}
}
public void RemoveHandler() {
if (EventHandler != null) {
Collection.CollectionChanged -= EventHandler;
EventHandler = null;
}
}
}
private readonly bool IsObservableCollectionInValue;
private readonly ConcurrentDictionary<TKey, TValue> InnerDictionary;
private readonly ConcurrentDictionary<TKey, CollectionChangedHandlerController> InnerCollectionsDictionary;
private readonly ConcurrentDictionary<TKey, TaskCompletionSource> Awaiters = new();
private readonly IEqualityComparer<TValue> ValueEqualityComparer = DefaultEqualityComparer<TValue>.Instance;
public int Count => IsObservableCollectionInValue ? InnerCollectionsDictionary.Count : InnerDictionary.Count;
public bool IsEmpty => IsObservableCollectionInValue ? InnerCollectionsDictionary.IsEmpty : InnerDictionary.IsEmpty;
/// <summary>
/// Типы отслеживаемых изменений коллекции.
/// </summary>
public CollectionChangedAction[] TrackedChangeActions { get; private set; }
public AsyncDictionary() {
IsObservableCollectionInValue = typeof(TValue).IsAssignableTo(typeof(IObservableCollection));
if (!IsObservableCollectionInValue) InnerDictionary = new();
else {
InnerCollectionsDictionary = new();
TrackedChangeActions = [
CollectionChangedAction.ItemAdded,
CollectionChangedAction.ItemRemoved,
CollectionChangedAction.ItemUpdated,
CollectionChangedAction.Reset];
}
}
/// <summary>
/// Инициализирует новый экземпляр класса <see cref="AsyncDictionary{TKey, TValue}"/>,
/// задавая отслеживаемые типы изменений коллекции.
/// </summary>
/// <param name="trackedChangeActions">
/// Массив отслеживаемых типов изменений, происходящих в коллекции.
/// </param>
public AsyncDictionary(CollectionChangedAction[] trackedChangeActions) : this() {
if (trackedChangeActions == null)
throw new ArgumentNullException(nameof(trackedChangeActions));
if (!IsObservableCollectionInValue)
throw new NotSupportedException("Отслеживание изменений поддерживается только для значений, реализующих интерфейс IObservableCollection.");
TrackedChangeActions = trackedChangeActions.Distinct().ToArray();
}
/// <summary>
/// Инициализирует новый экземпляр класса <see cref="AsyncDictionary{TKey, TValue}"/>,
/// копируя элементы из указанного словаря.
/// </summary>
/// <param name="valueEqualityComparer">
/// Необязательный компаратор значений для определения необходимости уведомлений об обновлениях.
/// Если не указан, используется компаратор по умолчанию.
/// </param>
public AsyncDictionary(IDictionary<TKey, TValue> dictionary, IEqualityComparer<TValue> valueEqualityComparer = null) : this() {
if (dictionary == null)
throw new ArgumentNullException(nameof(dictionary));
InnerDictionary = new ConcurrentDictionary<TKey, TValue>(dictionary);
foreach (var key in dictionary.Keys) {
var completionSource = new TaskCompletionSource();
completionSource.SetResult();
Awaiters.TryAdd(key, completionSource);
}
if (valueEqualityComparer != null)
ValueEqualityComparer = valueEqualityComparer;
}
/// <summary>
/// Добавляет новый элемент или обновляет существующий элемент в словаре.
/// </summary>
public void AddOrUpdate(TKey key, TValue value) {
if (IsObservableCollectionInValue) {
void onCollectionChanged(object sender, CollectionChangedEventArgs e) {
if (TrackedChangeActions.Contains(e.Action) &&
Awaiters.TryGetValue(key, out var completionSource) &&
!completionSource.Task.IsCompleted) {
completionSource.SetResult();
}
};
var newValue = new CollectionChangedHandlerController((IObservableCollection)value);
newValue.AddHandler(onCollectionChanged);
InnerCollectionsDictionary.AddOrUpdate(key, newValue, (key, existingValue) => {
existingValue.RemoveHandler();
return newValue;
});
} else {
bool newValueProduced = true;
InnerDictionary.AddOrUpdate(key, value, (key, existingValue) => {
newValueProduced = !ValueEqualityComparer.Equals(value, existingValue);
return value;
});
if (newValueProduced)
Awaiters.GetOrAdd(key, _ => new TaskCompletionSource()).TrySetResult();
}
}
/// <summary>
/// Пытается получить значение из словаря по ключу.
/// </summary>
public bool TryGetValue(TKey key, out TValue value) {
if (IsObservableCollectionInValue) {
if (InnerCollectionsDictionary.TryGetValue(key, out var handlerController)) {
value = (TValue)handlerController.Collection;
return true;
}
value = default;
return false;
} else {
return InnerDictionary.TryGetValue(key, out value);
}
}
/// <summary>
/// Пытается удалить элемент из словаря по ключу.
/// </summary>
public bool TryRemove(TKey key) {
if (IsObservableCollectionInValue) {
if (!InnerCollectionsDictionary.TryRemove(key, out var handlerController))
return false;
handlerController.RemoveHandler();
} else if (!InnerDictionary.TryRemove(key, out _)) {
return false;
}
if (Awaiters.TryRemove(key, out var completionSource))
completionSource.TrySetCanceled();
return true;
}
/// <summary>
/// Очищает словарь, удаляя все элементы.
/// </summary>
public void Clear() {
if (IsObservableCollectionInValue) {
var handlerControllers = InnerCollectionsDictionary.Values.ToArray();
InnerCollectionsDictionary.Clear();
foreach (var handlerController in handlerControllers)
handlerController.RemoveHandler();
} else {
InnerDictionary.Clear();
}
var completionSources = Awaiters.Values.ToArray();
Awaiters.Clear();
foreach (var completionSource in completionSources)
completionSource.TrySetCanceled();
}
/// <summary>
/// Асинхронно ожидает изменения значения для указанного ключа.
/// </summary>
/// <param name="key">Ключ отслеживаемого значения.</param>
/// <param name="cancellationToken">Токен отмены операции.</param>
/// <returns>Обновлённое значение, связанное с указанным ключом.</returns>
public async ValueTask<TValue> WaitForUpdate(TKey key, CancellationToken cancellationToken) {
var completionSource = Awaiters.GetOrAdd(key, _ => new TaskCompletionSource());
await completionSource.Task.WaitAsync(cancellationToken);
Awaiters.TryUpdate(key, new TaskCompletionSource(), completionSource);
TValue value;
if (IsObservableCollectionInValue) {
if (!InnerCollectionsDictionary.TryGetValue(key, out var handlerController))
throw new OperationCanceledException();
value = (TValue)handlerController.Collection;
} else {
if (!InnerDictionary.TryGetValue(key, out value))
throw new OperationCanceledException();
}
return value;
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() {
if (IsObservableCollectionInValue) {
return InnerCollectionsDictionary.ToDictionary(pair => pair.Key,
pair => (TValue)pair.Value.Collection).GetEnumerator();
}
return InnerDictionary.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
Ответы (1 шт):
Не должен словарь заниматься содержимым своих значений. Этим заниматься должен кто-то другой, например Channel<T>
.
Допустим, пусть будет вот такое сообщение TextMessage
public interface IMessage { }
public class GenericMessage<T> : IMessage
{
public T Body { get; init; }
}
public class TextMessage : GenericMessage<string> { }
Тогда "на коленке" можно написать вот такой класс
public class MessageDispatcher
{
private readonly ConcurrentDictionary<Type, object> _channels = new();
public ChannelReader<T> GetConsumer<T>() where T : IMessage
{
if (_channels.ContainsKey(typeof(T)))
throw new InvalidOperationException($"Consumer for {typeof(T).Name} is already registered");
var channel = Channel.CreateUnbounded<T>();
_channels.TryAdd(typeof(T), channel.Writer);
// если канал закрывается, следующая строчка удалит его из словаря автоматически
channel.Reader.Completion.ContinueWith(_ => _channels.Remove(typeof(T), out object _));
return channel.Reader;
}
public ChannelWriter<T> GetProducer<T>() where T: IMessage
{
return _channels.TryGetValue(typeof(T), out object instance)
? (ChannelWriter<T>)instance
: throw new InvalidOperationException($"Consumer for {typeof(T).Name} is not registered");
}
}
Конечно, я не уверен, что его логика вам на 100% подойдёт, но как пример должно быть норм.
internal class Program
{
static async Task Main(string[] args)
{
MessageDispatcher dispatcher = new();
var reader = dispatcher.GetConsumer<TextMessage>();
Task consumerTask = ConsumeTextMessageAsync(reader);
var writer = dispatcher.GetProducer<TextMessage>();
writer.TryWrite(new TextMessage { Body = "Hello" });
writer.TryWrite(new TextMessage { Body = "World" });
writer.Complete();
await consumerTask;
try
{
writer = dispatcher.GetProducer<TextMessage>();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
static async Task ConsumeTextMessageAsync(ChannelReader<TextMessage> reader)
{
await foreach (TextMessage message in reader.ReadAllAsync())
{
Console.WriteLine($"Message received: {message.Body}");
}
}
}
Запускаю
Message received: Hello
Message received: World
Consumer for TextMessage is not registered
Вроде работает, и обработка ошибок тоже работает.
То есть можно напилить и отнаследовать каких угодно типов сообщений, и для каждого типа будет свой канал, который по типу того же сообщения и можно создать.
Конечно в коде выше реализована схема "один консьюмер = сколько угодно продюсеров", но по сути канал технически позволяет сделать как сколько угодно консьюмеров, так и сколько угодно продюсеров, например для балансоровки нагрузки между консьюмерами. Канал - штука мощная и имеет очень серьёзную производительность.