Как избежать дедлока при переборе значений из небуферизированного канала в GO?

Всем привет, изучаю сейчас аспекты горутин и каналов в программировании на Go. У меня есть следующий код. Который запускает несколько горутин в которых идёт запись значения в канал который равен итератору i

package main

import (
    "fmt"
)

func main() {
    counter := 0
    numbers := []int{1, 2, 3, 4, 5}
    tempChannel := make(chan int)
    for _, val := range numbers {
        go func(val int) {
            tempChannel <- val
        }(val)
    }

    for value := range tempChannel {
        counter++
        fmt.Println(value)
    }
    fmt.Println("Counter is", counter)
}

В целом код рабочий, но в конце я получаю deadlock. с таким выводом:

1
3                                                                     
2                                                                     
4                                                                     
5                                                                     
fatal error: all goroutines are asleep - deadlock!                    
                                                                      
goroutine 1 [chan receive]:                                           
main.main()

Прочитал уже много информации по каналам и посмотрел много видео на ютубе по подобной реализации, во всех примерах у людей всё получается отлично, но не могу найти несоответствие в своём коде.


Ответы (3 шт):

Автор решения: Алёша Попович

Решил свой вопрос, используя немного другой метод:

func main() {
    counter := 0
    numbers := []int{1, 2, 3, 4, 5}
    tempChannel := make(chan int)
    for _, val := range numbers {
        go func(val int) {
            tempChannel <- val
        }(val)
    }

    for value := range tempChannel {
        counter++
        fmt.Println(value)
    }
    fmt.Println("Counter is", counter)
}

поставил горутину снаружи цикла, и в таком случае в цикле for value := range tempChannel{} уже перестаю получать <b>deadlock</b>.

→ Ссылка
Автор решения: Алексей

Чтобы не было дедлока в цикле, нужно закрывать канал функцией close().

Как вариант, можно цикл записи в канал запустить в горутине, и записывать в канал последовательно, а после цикла закрыть канал:

counter := 0
numbers := []int{1, 2, 3, 4, 5}
tempChannel := make(chan int)

go func() {
    for _, val := range numbers {
        tempChannel <- val
    }
    close(tempChannel)
}()

for value := range tempChannel {
    counter++
    fmt.Println(value)
}
fmt.Println("Counter is", counter)

Также можно использовать ещё один канал для завершения программы:

counter := 0
numbers := []int{1, 2, 3, 4, 5}
tempChannel := make(chan int)
counter2 := 0

fmt.Println("Press Ctrl+C to print counter")

for _, val := range numbers {
    go func(val int) {
        tempChannel <- val
        counter2++
    }(val)
}
go func() {
    for value := range tempChannel {
        counter++
        fmt.Println(value)
    }
}()
exit := make(chan os.Signal)
signal.Notify(exit, os.Interrupt)
<-exit
close(tempChannel)

fmt.Println("Counter is", counter)
→ Ссылка
Автор решения: Borislav

В вашем примере видится попытка написать обычное producer-consumer взаимодействие между рутинами.

Если так, то нужно учесть 2 момента:

  1. Закрытие канала - это тоже способ синхронизации.
  2. Закрытие канала производится только пишущей стороной.

Если придерживать этих 2-ух простых правил, то все будет ок. Приведу несколько примеров кода (некоторые могут быть более сложными, но идеоматичными):

  1. С использованием WaitGroup. Этот код очень похож на ваш, однако, используется еще одна закрывающая рутина, которая дождется выполнение всех provider-ов и закроет канал consumer-а, т.е. мы как раз добиваемся того поведения, что пишущая сторона закрывает канал, который используется потребляющей стороной для чтения.

    counter := 0
    numbers := []int{1, 2, 3, 4, 5}
    ch := make(chan int)
    
    wg := &sync.WaitGroup{}
    for _, val := range numbers {
        wg.Add(1)
        go func(val int) {
            defer wg.Done()
            ch <- val
        }(val)
    }
    go func() {
        wg.Wait()
        close(ch)
    }()
    
    for v := range ch {
        counter++
        fmt.Println(v)
    }
    
    fmt.Println("Counter is", counter)
    
  2. Один провайдер, который представлен рутиной закрывает за собой канал, когда его основной код будет выполнен. Тут все так же сохраняется подход с тем, что когда отработал provider и более ничего не запишет в канал, то закрывает его за собой и тем самым сигнализирует comsumer-у, что он может спокойно выйти, после того как обработает все, что ему было передано.

    counter := 0
    numbers := []int{1, 2, 3, 4, 5}
    ch := make(chan int)
    
    go func() {
        defer close(ch)
        for _, val := range numbers {
            ch <- val
        }
    }()
    
    for v := range ch {
        counter++
        fmt.Println(v)
    }
    
    fmt.Println("Counter is", counter)
    
  3. Закрытие consumer-а через дополнительный канал (этот код показывает, как использовать context.Context, т.к. принцип будет аналогичен, у context-а под копотом имеется канал и через закрытие этого канала реализуется broadcusting на все рутины, которые его слушают). Этот пример более продвинутый и полезен, если у вас дополнительная обработка в consumer-е, т.е. вы можете выполнять полезную работу пока ждете данные из provider-ов.

    counter := 0
    numbers := []int{1, 2, 3, 4, 5}
    ch := make(chan int)
    done := make(chan struct{})
    
    go func() {
        wg := sync.WaitGroup{}
        defer func() {
            wg.Wait()
            close(done)
        }()
        for _, val := range numbers {
            wg.Add(1)
            go func(val int) {
                defer wg.Done()
                ch <- val
            }(val)
        }
    }()
    
    loop:
        for {
            select {
            case <-done:
                break loop
            case val := <-ch:
                counter++
                fmt.Println(val)
            }
        }
    
        fmt.Println("Counter is", counter)
    
→ Ссылка