Как работать в цикле с более чем 1 горутиной?

Есть входные данные, кол-во потоков и кол-во профилей. Каждая подписка увеличивает profile на 1, пока он не достигнет totalProfile

Все это должно выполняться параллельно в определенном кол-ве потоков Threads

Сейчас каждый поток проходится по всему списку и в конце падает "panic: sync: negative WaitGroup counter"

func Sub(threads, totalProfiles int) {

var wg sync.WaitGroup
var mu sync.Mutex
var profile = 1
wg.Add(profile)

start := time.Now()

for k := 1; k < threads; k++ {

    go func(k int) {
        for i := profile; profile < totalProfiles; i++ {
            fmt.Printf("Thread %v: profile %v done\n", k, i)
            mu.Lock()
            profile++
            mu.Unlock()
        }
        defer wg.Done()
    }(k)
}
wg.Wait()
fmt.Println(time.Now().Sub(start).Seconds())
}

Ответы (2 шт):

Автор решения: newman

Потому что вы делаете добавление записи в WaitGroup только один раз. А нужно каждый раз перед тем как запускаете горутину. Т.е. wg.Add(profile) нужно перенести внутри цикла for - но перед вызовом go

И нужно использовать wg.Add(1) - переменная тут лишняя. Вернее даже вредная.

А так вызов wg.Add(1) увеличивает счетчик на 1. Вызов wg.Done() уменьшает счетчик на 1.

UPD: по вопросам в комментариях

Самый простой вариант это наверное что-то такое

func Sub(threads, totalProfiles int) {

var wg sync.WaitGroup
var mu sync.Mutex

start := time.Now()
chunksize = totalProfiles / threads

for k := 0; k < threads; k++ {
    wg.Add(1)
    go func(s, e int) {
        for i := s; i < e; i++ {
            fmt.Printf("Thread %v: profile %v done\n", k, i)
        }
        defer wg.Done()
    }(k * chunksize, (k+1)*chunksize)
}
wg.Wait()
fmt.Println(time.Now().Sub(start).Seconds())
}

Т.е. весь список профилей разбиваем на равные куски. И для каждого куска запускаем отдельную горутину. Писал так сказать "на коленке" так что могут быт ькакие-то неточности по синтаксису или же логике. Но думаю что основная идея понятна.

→ Ссылка
Автор решения: Mark
func Sub(threads, totalProfiles int) {

    wg := sync.WaitGroup{}
    wg.Add(threads)

    // очередь задач
    tasks := make(chan int)

    // запуск пула воркеров,  обрабатывающих задачи
    for k := 0; k < threads; k++ {
        // воркер в виде анонимной функции
        go func(k int) {
            defer wg.Done() // место для defer
            // воркер будет обрабатывать задачи из очереди пока очередь задач не закроется
            for task := range tasks {
                fmt.Printf("Thread %d: profile %d done\n", k, task)
            }
        }(k)
    }

    // передача задач в очередь
    for i := 0; i < totalProfiles; i++ {
        tasks <- i
    }
    // закрываем канал после того, как все задачи в него переданы
    close(tasks)

    // ждем пока все воркеры завершат работу
    wg.Wait()
}

https://go.dev/play/p/5jJMraStA4k

→ Ссылка