Неблокирующий канал с буфером
Помогите сообразить, как сделать следующее, желательно в "го-стиле".
Есть рутина, которая запускает консольную утилиту и парсит её вывод. Данные идут порциями, их частота от меня никак не зависит. Может быть 10+ пакетов в секунду, может 1. Вторая рутина должна брать эти пакеты, немного их обработать и отправить на внешний веб-сервис.
Изначально идея была - делаем канал между этими двуми рутинами. Парсим пакет - записываем в канал - читаем с канала - отправляем. Всё хорошо, пока связь стабильная и быстрая. Как только связь теряется, тормозит - начинается затык. Данные сыпятся быстрее, чем мы можем их отправлять. Важно, что потеря части данных не критична, главное чтобы последние N пакетов держались в памяти и по возможности отправлялись.
Напрашивается что-то типа очереди с автоматическим удалением старых элементов при превышении лимита N. Первая горутина туда кладёт, вторая оттуда достаёт.
То есть другими словами, нужно сделать такой канал с буфером, который бы при записи в него не блокировал (если в канале уже N элементов - самое старое значение удаляется, новое добавляется), а при чтении - блокировал (при отсутствии данных).
Станадртный буферизированный канал не подходит - при заполненном буфере он блокирует запись.
Так как в го я новичок - трудно сообразить как такое наколхозить. Попробовал поискать по репам/гуглу - ничего похожего найти не могу.
Подскажите, куда копать, где можно подсмотреть что-то похожее?
Ответы (1 шт):
Вроде разобрался сам, всё оказалось проще чем представлялось. В сценарии с один писателем-читателем должно работать без ошибок, насколько я понимаю.
type StackBuffer[T any] struct {
capacity int
out chan T
}
func NewStackBuffer[T any](capacity int) *StackBuffer[T] {
return &StackBuffer[T]{
capacity: capacity,
out: make(chan T, capacity),
}
}
func (q *StackBuffer[T]) Push(value T) {
if len(q.out) == q.capacity {
<-q.out
}
q.out <- value
}
func (q *StackBuffer[T]) Pop() <-chan T {
return q.out
}
func (q *StackBuffer[T]) Close() {
close(q.out)
}
Тестовый код:
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
queue := NewStackBuffer[int](5)
go func() {
for i := 0; i < 100; i++ {
queue.Push(i)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
queue.Close()
}()
for v := range queue.Pop() {
fmt.Printf("Pops from channel value: %d\n", v)
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
}
}