golang: Вставка данных в один массив из нескольких горутин
Как правильно написать код максимально быстрой работы всех пяти функций f1-f5 по вставке данных в один массив test? На примере кода, который работает криво:
package main
import (
"fmt"
"strconv"
"strings"
"time"
)
var data = []string{}
var test = []string{}
var ch_i = []chan string{}
var ch_o = []string{}
var ch = make(chan string, 'z')
func f0() {
for j := 0; j < 10; j++ {
data = append(data, strconv.Itoa(j))
}
fmt.Println("\n data: ", data)
}
func main() {
f0()
// - - - - - -
go f1("A", ch)
go f2("B", ch)
f3("C", ch)
go f4("D", ch)
go f5("E", ch)
// - - - - - -
time.Sleep(3 * time.Second)
fmt.Println("\nlen ch_i: ", len(ch_i), "\ncap ch_i: ", cap(ch_i), "\n ch_i: ", ch_i, "\n ")
fmt.Println("\nlen ch_o: ", len(ch_o), "\ncap ch_o: ", cap(ch_o), "\n ch_o: ", strings.Join(ch_o, ", "), "\n ")
fmt.Println("\nlen test: ", len(test), "\ncap test: ", cap(test), "\n test: ", strings.Join(test, ", "), "\n ")
}
func f1(s string, ch chan string) {
for i := 0; i < 10; i++ {
time.Sleep(10 * time.Nanosecond)
ch_i = append(ch_i, ch)
test = append(test, s+data[i])
ch_o = append(ch_o, "a")
ch <- "a"
}
}
func f2(s string, ch chan string) {
for i := 0; i < 10; i++ {
time.Sleep(10 * time.Nanosecond)
ch_i = append(ch_i, ch)
test = append(test, s+data[i])
ch_o = append(ch_o, "b")
ch <- "b"
}
}
func f3(s string, ch chan string) {
for i := 0; i < 10; i++ {
time.Sleep(10 * time.Nanosecond)
ch_i = append(ch_i, ch)
test = append(test, s+data[i])
ch_o = append(ch_o, "c")
ch <- "c"
}
}
func f4(s string, ch chan string) {
for i := 0; i < 10; i++ {
time.Sleep(10 * time.Nanosecond)
ch_i = append(ch_i, ch)
test = append(test, s+data[i])
ch_o = append(ch_o, "d")
ch <- "d"
}
}
func f5(s string, ch chan string) {
for i := 0; i < 10; i++ {
time.Sleep(10 * time.Nanosecond)
ch_i = append(ch_i, ch)
test = append(test, s+data[i])
ch_o = append(ch_o, "e")
ch <- "e"
}
}
Ответы (1 шт):
Я бы сделал синхронный массив, атомарность добавления в который гарантируется мьютексами:
type SyncArray[T any] struct {
array []T
lock sync.Mutex
}
func (ca *SyncArray[T]) Append(val ...T) *SyncArray[T] {
ca.lock.Lock()
defer ca.lock.Unlock()
ca.array = append(ca.array, val...)
return ca
}
func (ca *SyncArray[T]) Array() []T {
ca.lock.Lock()
defer ca.lock.Unlock()
return ca.array[:]
}
func (ca *SyncArray[T]) Len() int {
return len(ca.array)
}
Работает довольно быстро. Бенчмарк для сравнения записи в массив из пяти горутин с последовательными добавлениями такого же количества элементов.
import (
"sync"
"testing"
"example.org/concarray"
"github.com/stretchr/testify/assert"
)
const (
numOfAppends = 100_000
numOfWorkers = 5
)
func syncWriter(sa *concarray.SyncArray[int], val int, count int, wg *sync.WaitGroup) {
for i := 0; i < count; i++ {
sa.Append(val)
}
wg.Done()
}
func BenchmarkSyncArray(b *testing.B) {
for i := 0; i < b.N; i++ {
sa := concarray.SyncArray[int]{}
wg := sync.WaitGroup{}
for j := 0; j < numOfWorkers; j++ {
wg.Add(1)
go syncWriter(&sa, j, numOfAppends, &wg)
}
wg.Wait()
assert.Equal(b, numOfWorkers*numOfAppends, sa.Len())
}
}
func BenchmarkPlainArrayt(b *testing.B) {
for i := 0; i < b.N; i++ {
arr := []int{}
for j := 0; j < numOfWorkers; j++ {
for k := 0; k < numOfAppends; k++ {
arr = append(arr, j)
}
}
assert.Equal(b, numOfWorkers*numOfAppends, len(arr))
}
}
Результат:
goos: linux
goarch: amd64
pkg: example.org/concarray
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
BenchmarkSyncArray-8 19 60360342 ns/op 21083928 B/op 44 allocs/op
BenchmarkPlainArrayt-8 51 24338208 ns/op 21083450 B/op 36 allocs/op
PASS
Видно, что синхронный массив проигрывает примерно в два с половиной раза по времени и делает больше аллокаций, но зато не портится при параллельном обновлении.
Я сделал вариант с каналами, когда есть выделенная горутина, модифицирующая массив, но этот вариант оказался гораздо медленнее
type MutiWriteArray[T any] struct {
array []T
done chan struct{}
stream chan T
closed bool
finished sync.WaitGroup
}
func NewMultiWriteArray[T any]() *MutiWriteArray[T] {
arr := &MutiWriteArray[T]{
array: []T{},
done: make(chan struct{}),
stream: make(chan T),
closed: false,
}
arr.finished.Add(1)
go func() {
for {
select {
case v := <-arr.stream:
arr.array = append(arr.array, v)
case <-arr.done:
close(arr.stream)
arr.closed = true
arr.finished.Done()
return
}
}
}()
return arr
}
func (mwa *MutiWriteArray[T]) Append(val ...T) *MutiWriteArray[T] {
for _, v := range val {
mwa.stream <- v
}
return mwa
}
func (mwa *MutiWriteArray[T]) Array() []T {
return mwa.array[:]
}
func (mwa *MutiWriteArray[T]) Len() int {
return len(mwa.array)
}
func (mwa *MutiWriteArray[T]) Close() {
close(mwa.done)
mwa.finished.Wait()
}