C# получение данных по SerialPort и запись в переменную, например, в Queue
Есть устройство, которое шлёт пакеты по SerialPort в произвольное время. Я хочу слушать порт, получать и записывать принятые байты из пакетов в переменную, чтобы потом оттуда брать и использовать дальше. Я выбрал тип Queue, потому что посчитал, что будет удобно с ним работать - в каком порядке пакеты получил, в таком обработал. Нашёл вопрос: Проблема получения данных из SerialPort в службе C#. Попробовал, в файл пишет всё как надо, переписал под запись в очередь:
class ComWatcher
{
public ComWatcher(SerialPort serialPort)
{
port = serialPort;
}
SerialPort port;
CancellationTokenSource cts;
private Queue<byte[]> _queueOfBytes = new();
public Queue<byte[]> QueueOfBytes
{
get { return _queueOfBytes; }
set { _queueOfBytes = value; }
}
public async void Start()
{
if (cts != null)
throw new InvalidOperationException("Already started");
cts = new CancellationTokenSource();
CancellationToken ct = cts.Token;
try
{
if(!port.IsOpen)
port.Open();
byte[] buf = new byte[16];
var stream = port.BaseStream;
while (!ct.IsCancellationRequested)
{
var actuallyRead = await stream.ReadAsync(buf, 0, buf.Length, ct);
if (actuallyRead == 0) // end of stream
return;
await EnqueueAsync(buf);
}
}
catch (IOException io)
{
Console.WriteLine(io.Message);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public void Stop()
{
var cts = this.cts;
this.cts = null;
if (cts == null)
throw new InvalidOperationException("Not started");
port.Close();
cts.Cancel();
}
public async Task EnqueueAsync(byte[] buf)
{
try
{
await Task.Run(() => QueueOfBytes.Enqueue(buf));
}
catch (Exception ex)
{
Console.WriteLine($"{ex.Message}");
}
}
}
К сожалению, в Queue нет WriteAsync, поэтому попытался написать сам. Код работает некорректно, потому что, например, пакет отсылается такой:
[01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16]
А в результате могут получаться следующие варианты:
[01 остальные байты 00],
[02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 01],
[01 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15]
Иногда отдельные байты или отсутствуют или дублируются, будто бы берутся из других пакетов. Подозреваю, что что-то не так с асинхронностью. Как считывать пакеты по SerialPort и записывать в Queue или какой либо другой тип данных?
Ответы (1 шт):
Queue<T>
не является потокобезопасной коллекцией, на этот случай есть ConcurrentQueue<T>
, но вам даже и она не очень удобна будет. А суть ошибки в том, что вы кидаете в очередь массив, в который затем перезаписываете данные. Буфер надо в этом случае создавать на каждую итерацию цикла.
Предлагаю использовать нормальный потокобезопасный Producer/Consumer - Channel<T>
.
class ComWatcher
{
private readonly SerialPort port;
private CancellationTokenSource cts;
private Channel<byte[]> channel;
public ChannelReader<byte[]> Reader => channel?.Reader ?? throw new InvalidOperationException("Not initialized");
public ComWatcher(SerialPort serialPort)
{
port = serialPort;
}
public async void Start()
{
if (cts != null)
throw new InvalidOperationException("Already started");
channel = Channel.CreateUnbounded<byte[]>();
using (cts = new CancellationTokenSource())
{
CancellationToken ct = cts.Token;
try
{
if (!port.IsOpen)
port.Open();
var stream = port.BaseStream;
byte[] buf = new byte[16];
while (true)
{
var actuallyRead = await stream.ReadAsync(buf, 0, buf.Length, ct);
if (actuallyRead == 0) // end of stream
return;
channel.Writer.TryWrite(buf[..actuallyRead]); // здесь Range создаёт копию массива, поэтому всё будет ОК
}
}
catch (IOException io)
{
Console.WriteLine(io.Message);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
cts = null;
channel.Writer.Complete();
port.Close();
}
public void Stop()
{
if (cts == null)
throw new InvalidOperationException("Not started");
cts.Cancel();
}
}
Использовать как-то так
ComWatcher watcher = new(new SerialPort("COM1"));
watcher.Start();
await foreach (byte[] buffer in watcher.Reader.ReadAllAsync())
{
Console.WriteLine($"[{string.Join(" ", buffer.Select(x => x.ToString("x2")))}]");
}
При вызове Stop()
цикл сам завершится. Кстати, буфер пожирнее можно использовать, раз в 1000.