Неблокирующий канал с буфером

Помогите сообразить, как сделать следующее, желательно в "го-стиле".

Есть рутина, которая запускает консольную утилиту и парсит её вывод. Данные идут порциями, их частота от меня никак не зависит. Может быть 10+ пакетов в секунду, может 1. Вторая рутина должна брать эти пакеты, немного их обработать и отправить на внешний веб-сервис.

Изначально идея была - делаем канал между этими двуми рутинами. Парсим пакет - записываем в канал - читаем с канала - отправляем. Всё хорошо, пока связь стабильная и быстрая. Как только связь теряется, тормозит - начинается затык. Данные сыпятся быстрее, чем мы можем их отправлять. Важно, что потеря части данных не критична, главное чтобы последние N пакетов держались в памяти и по возможности отправлялись.

Напрашивается что-то типа очереди с автоматическим удалением старых элементов при превышении лимита N. Первая горутина туда кладёт, вторая оттуда достаёт.

То есть другими словами, нужно сделать такой канал с буфером, который бы при записи в него не блокировал (если в канале уже N элементов - самое старое значение удаляется, новое добавляется), а при чтении - блокировал (при отсутствии данных).

Станадртный буферизированный канал не подходит - при заполненном буфере он блокирует запись.

Так как в го я новичок - трудно сообразить как такое наколхозить. Попробовал поискать по репам/гуглу - ничего похожего найти не могу.

Подскажите, куда копать, где можно подсмотреть что-то похожее?


Ответы (1 шт):

Автор решения: just.slon

Вроде разобрался сам, всё оказалось проще чем представлялось. В сценарии с один писателем-читателем должно работать без ошибок, насколько я понимаю.

type StackBuffer[T any] struct {
    capacity int
    out      chan T
}

func NewStackBuffer[T any](capacity int) *StackBuffer[T] {
    return &StackBuffer[T]{
        capacity: capacity,
        out:      make(chan T, capacity),
    }
}

func (q *StackBuffer[T]) Push(value T) {
    if len(q.out) == q.capacity {
        <-q.out
    }
    q.out <- value
}

func (q *StackBuffer[T]) Pop() <-chan T {
    return q.out
}

func (q *StackBuffer[T]) Close() {
    close(q.out)
}

Тестовый код:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    queue := NewStackBuffer[int](5)

    go func() {
        for i := 0; i < 100; i++ {
            queue.Push(i)
            time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        }
        queue.Close()
    }()

    for v := range queue.Pop() {
        fmt.Printf("Pops from channel value: %d\n", v)
        time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
    }
}

→ Ссылка