Как избежать дедлока при переборе значений из небуферизированного канала в GO?
Всем привет, изучаю сейчас аспекты горутин и каналов в программировании на Go. У меня есть следующий код. Который запускает несколько горутин в которых идёт запись значения в канал который равен итератору i
package main
import (
"fmt"
)
func main() {
counter := 0
numbers := []int{1, 2, 3, 4, 5}
tempChannel := make(chan int)
for _, val := range numbers {
go func(val int) {
tempChannel <- val
}(val)
}
for value := range tempChannel {
counter++
fmt.Println(value)
}
fmt.Println("Counter is", counter)
}
В целом код рабочий, но в конце я получаю deadlock. с таким выводом:
1
3
2
4
5
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
Прочитал уже много информации по каналам и посмотрел много видео на ютубе по подобной реализации, во всех примерах у людей всё получается отлично, но не могу найти несоответствие в своём коде.
Ответы (3 шт):
Решил свой вопрос, используя немного другой метод:
func main() {
counter := 0
numbers := []int{1, 2, 3, 4, 5}
tempChannel := make(chan int)
for _, val := range numbers {
go func(val int) {
tempChannel <- val
}(val)
}
for value := range tempChannel {
counter++
fmt.Println(value)
}
fmt.Println("Counter is", counter)
}
поставил горутину снаружи цикла, и в таком случае в цикле for value := range tempChannel{}
уже перестаю получать <b>deadlock</b>
.
Чтобы не было дедлока в цикле, нужно закрывать канал функцией close()
.
Как вариант, можно цикл записи в канал запустить в горутине, и записывать в канал последовательно, а после цикла закрыть канал:
counter := 0
numbers := []int{1, 2, 3, 4, 5}
tempChannel := make(chan int)
go func() {
for _, val := range numbers {
tempChannel <- val
}
close(tempChannel)
}()
for value := range tempChannel {
counter++
fmt.Println(value)
}
fmt.Println("Counter is", counter)
Также можно использовать ещё один канал для завершения программы:
counter := 0
numbers := []int{1, 2, 3, 4, 5}
tempChannel := make(chan int)
counter2 := 0
fmt.Println("Press Ctrl+C to print counter")
for _, val := range numbers {
go func(val int) {
tempChannel <- val
counter2++
}(val)
}
go func() {
for value := range tempChannel {
counter++
fmt.Println(value)
}
}()
exit := make(chan os.Signal)
signal.Notify(exit, os.Interrupt)
<-exit
close(tempChannel)
fmt.Println("Counter is", counter)
В вашем примере видится попытка написать обычное producer-consumer взаимодействие между рутинами.
Если так, то нужно учесть 2 момента:
- Закрытие канала - это тоже способ синхронизации.
- Закрытие канала производится только пишущей стороной.
Если придерживать этих 2-ух простых правил, то все будет ок. Приведу несколько примеров кода (некоторые могут быть более сложными, но идеоматичными):
С использованием WaitGroup. Этот код очень похож на ваш, однако, используется еще одна закрывающая рутина, которая дождется выполнение всех provider-ов и закроет канал consumer-а, т.е. мы как раз добиваемся того поведения, что пишущая сторона закрывает канал, который используется потребляющей стороной для чтения.
counter := 0 numbers := []int{1, 2, 3, 4, 5} ch := make(chan int) wg := &sync.WaitGroup{} for _, val := range numbers { wg.Add(1) go func(val int) { defer wg.Done() ch <- val }(val) } go func() { wg.Wait() close(ch) }() for v := range ch { counter++ fmt.Println(v) } fmt.Println("Counter is", counter)
Один провайдер, который представлен рутиной закрывает за собой канал, когда его основной код будет выполнен. Тут все так же сохраняется подход с тем, что когда отработал provider и более ничего не запишет в канал, то закрывает его за собой и тем самым сигнализирует comsumer-у, что он может спокойно выйти, после того как обработает все, что ему было передано.
counter := 0 numbers := []int{1, 2, 3, 4, 5} ch := make(chan int) go func() { defer close(ch) for _, val := range numbers { ch <- val } }() for v := range ch { counter++ fmt.Println(v) } fmt.Println("Counter is", counter)
Закрытие consumer-а через дополнительный канал (этот код показывает, как использовать context.Context, т.к. принцип будет аналогичен, у context-а под копотом имеется канал и через закрытие этого канала реализуется broadcusting на все рутины, которые его слушают). Этот пример более продвинутый и полезен, если у вас дополнительная обработка в consumer-е, т.е. вы можете выполнять полезную работу пока ждете данные из provider-ов.
counter := 0 numbers := []int{1, 2, 3, 4, 5} ch := make(chan int) done := make(chan struct{}) go func() { wg := sync.WaitGroup{} defer func() { wg.Wait() close(done) }() for _, val := range numbers { wg.Add(1) go func(val int) { defer wg.Done() ch <- val }(val) } }() loop: for { select { case <-done: break loop case val := <-ch: counter++ fmt.Println(val) } } fmt.Println("Counter is", counter)