Сохранение порядка входных и выходных данных при использовании горутин
Имеется функция, которая выполняет переданные ей функции параллельно, собирает результат и возвращает:
func gather(funcs []func() any) []any {
var res []any
stream := make(chan any, len(funcs))
for _, elem := range funcs {
go func(elem func() any) {
stream <- elem()
}(elem)
}
for i := 0; i < len(funcs); i++ {
res = append(res, <-stream)
}
return res
}
В эту функцию отправляются функции считающие квадрат с задержкой:
func squared(n int) func() any {
return func() any {
time.Sleep(time.Duration(n) * 100 * time.Millisecond)
return n * n
}
}
Из main() они отправляются так:
funcs := []func() any{squared(4), squared(3), squared(2)}
nums := gather(funcs)
Проблема состоит в том, что в рамках задачи необходимо вернуть результаты ровно в том же порядке, что они были отправлены. Для примера выше правильным выводом будет: [16 9 4], по факту конечно же получаем [4 9 16]. Насколько я понимаю, вызвано это тем, что функции с меньшим входным числом отрабатывают немного быстрее и результат попадает в буферизированный канал раньше. Я пробовал использовать WaitGroup, например так:
func gather(funcs []func() any) []any {
var wg sync.WaitGroup
wg.Add(len(funcs))
var res []any
stream := make(chan any, len(funcs))
for _, elem := range funcs {
go func(elem func() any) {
defer wg.Done()
stream <- elem()
}(elem)
}
wg.Wait()
for i := 0; i < len(funcs); i++ {
res = append(res, <-stream)
}
return res
}
Пробовал использовать канал завершения <-done. Все мои попытки либо не приводят к желаемому результату, либо убивают всю идею многопоточности (например, ожидать завершения горутины в каждой итерации цикла). Что я упускаю? Какой есть метод сохранения порядка вывода? Я не разбираюсь в JS, но насколько я понимаю в других языках это делается с помощью async/await и например Promise.all (в JS). Как это реализовать в моих условиях на Golang?
Ответы (2 шт):
func gather(funcs []func() any) []any {
// мы знаем размер массива с функциями, поэтому можем сразу выделить память
res := make([]any, len(funcs))
stream := make(chan any, len(funcs))
var wg sync.WaitGroup
wg.Add(len(funcs))
for idx, elem := range funcs {
idx := idx // нужно сделать копию, иначе все функции будут использовать один и тот же idx
// этот индекс уже есть порядковый номер вызова
go func(elem func() any) {
stream <- elem()
res[idx] = elem()
wg.Done()
}(elem)
}
wg.Wait()
return res
}
но операции с массивами не потоконебезопасно. но так как мы создали массив определенной длины, и ссылаемся на конкретный индекс, то можно так сделать
Вариант решения #1 с использованием структуры и без прямой записи в срез в горутине.
func gather(funcs []func() any) []any {
// результат вызова функции и ее индекс
type result struct {
idx int
val any
}
n := len(funcs)
// запускаем каждую функцию в горутине и отправляем результат в канал
ready := make(chan result, n)
for idx, fn := range funcs {
idx := idx
fn := fn
go func() {
ready <- result{idx, fn()}
}()
}
// забираем результаты из канала и аккумулируем в итоговом срезе
results := make([]any, n)
for i := 0; i < n; i++ {
res := <-ready
results[res.idx] = res.val
}
return results
}
Вариант решения #2 с прямым обращением к срезу и использованием закрывающего канала для отслеживания отработки всех запущенных горутин.
func gather(funcs []func() any) []any {
done := make(chan struct{})
//создаем срез размером по количеству переданных функций
results := make([]any, len(funcs))
// запускаем функции в отдельной горутине
for idx, fn := range funcs {
idx, fn := idx, fn
go func() {
// записываем результат i-й функции в i-ю позицию среза
results[idx] = fn()
done <- struct{}{}
}()
}
// дожидаемся, пока отработают все горутины
for i := 0; i < len(funcs); i++ {
<-done
}
return results
}