C# получение данных по SerialPort и запись в переменную, например, в Queue

Есть устройство, которое шлёт пакеты по SerialPort в произвольное время. Я хочу слушать порт, получать и записывать принятые байты из пакетов в переменную, чтобы потом оттуда брать и использовать дальше. Я выбрал тип Queue, потому что посчитал, что будет удобно с ним работать - в каком порядке пакеты получил, в таком обработал. Нашёл вопрос: Проблема получения данных из SerialPort в службе C#. Попробовал, в файл пишет всё как надо, переписал под запись в очередь:

    class ComWatcher
    {
        public ComWatcher(SerialPort serialPort)
        {
            port = serialPort;
        }

        SerialPort port;
        CancellationTokenSource cts;

        private Queue<byte[]> _queueOfBytes = new();
        public Queue<byte[]> QueueOfBytes
        {
            get { return _queueOfBytes; }
            set { _queueOfBytes = value; }
        }

        public async void Start()
        {
            if (cts != null)
                throw new InvalidOperationException("Already started");
            cts = new CancellationTokenSource();
            CancellationToken ct = cts.Token;
            try
            {
                if(!port.IsOpen)
                    port.Open();
                byte[] buf = new byte[16];
                var stream = port.BaseStream;

                    while (!ct.IsCancellationRequested)
                    {
                        var actuallyRead = await stream.ReadAsync(buf, 0, buf.Length, ct);
                        if (actuallyRead == 0) // end of stream
                            return;
                        await EnqueueAsync(buf);
                    }
            }
            catch (IOException io)
            {
                Console.WriteLine(io.Message);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        public void Stop()
        {
            var cts = this.cts;
            this.cts = null;
            if (cts == null)
                throw new InvalidOperationException("Not started");
            port.Close();
            cts.Cancel();
        }

        public async Task EnqueueAsync(byte[] buf)
        {
            try
            {
                await Task.Run(() => QueueOfBytes.Enqueue(buf));
            }
            catch (Exception ex)
            {
                Console.WriteLine($"{ex.Message}");
            }

        }
    }

К сожалению, в Queue нет WriteAsync, поэтому попытался написать сам. Код работает некорректно, потому что, например, пакет отсылается такой:

[01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16]

А в результате могут получаться следующие варианты:

[01 остальные байты 00], 
[02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 01], 
[01 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15]

Иногда отдельные байты или отсутствуют или дублируются, будто бы берутся из других пакетов. Подозреваю, что что-то не так с асинхронностью. Как считывать пакеты по SerialPort и записывать в Queue или какой либо другой тип данных?


Ответы (1 шт):

Автор решения: aepot

Queue<T> не является потокобезопасной коллекцией, на этот случай есть ConcurrentQueue<T>, но вам даже и она не очень удобна будет. А суть ошибки в том, что вы кидаете в очередь массив, в который затем перезаписываете данные. Буфер надо в этом случае создавать на каждую итерацию цикла.

Предлагаю использовать нормальный потокобезопасный Producer/Consumer - Channel<T>.

class ComWatcher
{
    private readonly SerialPort port;
    private CancellationTokenSource cts;
    private Channel<byte[]> channel;

    public ChannelReader<byte[]> Reader => channel?.Reader ?? throw new InvalidOperationException("Not initialized");

    public ComWatcher(SerialPort serialPort)
    {
        port = serialPort;
    }

    public async void Start()
    {
        if (cts != null)
            throw new InvalidOperationException("Already started");
        channel = Channel.CreateUnbounded<byte[]>();
        using (cts = new CancellationTokenSource())
        {
            CancellationToken ct = cts.Token;
            try
            {
                if (!port.IsOpen)
                    port.Open();
                var stream = port.BaseStream;
                byte[] buf = new byte[16];

                while (true)
                {
                    var actuallyRead = await stream.ReadAsync(buf, 0, buf.Length, ct);
                    if (actuallyRead == 0) // end of stream
                        return;
                    channel.Writer.TryWrite(buf[..actuallyRead]); // здесь Range создаёт копию массива, поэтому всё будет ОК
                }
            }
            catch (IOException io)
            {
                Console.WriteLine(io.Message);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
        cts = null;
        channel.Writer.Complete();
        port.Close();
    }

    public void Stop()
    {
        if (cts == null)
            throw new InvalidOperationException("Not started");            
        cts.Cancel();
    }
}

Использовать как-то так

ComWatcher watcher = new(new SerialPort("COM1"));
watcher.Start();
await foreach (byte[] buffer in watcher.Reader.ReadAllAsync())
{
    Console.WriteLine($"[{string.Join(" ", buffer.Select(x => x.ToString("x2")))}]");
}

При вызове Stop() цикл сам завершится. Кстати, буфер пожирнее можно использовать, раз в 1000.

→ Ссылка