Как работать в цикле с более чем 1 горутиной?
Есть входные данные, кол-во потоков и кол-во профилей. Каждая подписка увеличивает profile на 1, пока он не достигнет totalProfile
Все это должно выполняться параллельно в определенном кол-ве потоков Threads
Сейчас каждый поток проходится по всему списку и в конце падает "panic: sync: negative WaitGroup counter"
func Sub(threads, totalProfiles int) {
var wg sync.WaitGroup
var mu sync.Mutex
var profile = 1
wg.Add(profile)
start := time.Now()
for k := 1; k < threads; k++ {
go func(k int) {
for i := profile; profile < totalProfiles; i++ {
fmt.Printf("Thread %v: profile %v done\n", k, i)
mu.Lock()
profile++
mu.Unlock()
}
defer wg.Done()
}(k)
}
wg.Wait()
fmt.Println(time.Now().Sub(start).Seconds())
}
Ответы (2 шт):
Потому что вы делаете добавление записи в WaitGroup только один раз.
А нужно каждый раз перед тем как запускаете горутину. Т.е. wg.Add(profile) нужно перенести внутри цикла for - но перед вызовом go
И нужно использовать wg.Add(1) - переменная тут лишняя. Вернее даже вредная.
А так вызов wg.Add(1) увеличивает счетчик на 1. Вызов wg.Done() уменьшает счетчик на 1.
UPD: по вопросам в комментариях
Самый простой вариант это наверное что-то такое
func Sub(threads, totalProfiles int) {
var wg sync.WaitGroup
var mu sync.Mutex
start := time.Now()
chunksize = totalProfiles / threads
for k := 0; k < threads; k++ {
wg.Add(1)
go func(s, e int) {
for i := s; i < e; i++ {
fmt.Printf("Thread %v: profile %v done\n", k, i)
}
defer wg.Done()
}(k * chunksize, (k+1)*chunksize)
}
wg.Wait()
fmt.Println(time.Now().Sub(start).Seconds())
}
Т.е. весь список профилей разбиваем на равные куски. И для каждого куска запускаем отдельную горутину. Писал так сказать "на коленке" так что могут быт ькакие-то неточности по синтаксису или же логике. Но думаю что основная идея понятна.
func Sub(threads, totalProfiles int) {
wg := sync.WaitGroup{}
wg.Add(threads)
// очередь задач
tasks := make(chan int)
// запуск пула воркеров, обрабатывающих задачи
for k := 0; k < threads; k++ {
// воркер в виде анонимной функции
go func(k int) {
defer wg.Done() // место для defer
// воркер будет обрабатывать задачи из очереди пока очередь задач не закроется
for task := range tasks {
fmt.Printf("Thread %d: profile %d done\n", k, task)
}
}(k)
}
// передача задач в очередь
for i := 0; i < totalProfiles; i++ {
tasks <- i
}
// закрываем канал после того, как все задачи в него переданы
close(tasks)
// ждем пока все воркеры завершат работу
wg.Wait()
}