php小编草莓在本文中将为大家介绍多个goroutine从同一通道读取的相关内容。在并发编程中,goroutine是Go语言中的轻量级线程,可以同时执行多个任务。通道是goroutine之间进行通信的重要方式。当多个goroutine需要从同一个通道读取数据时,我们需要注意一些问题,并采取相应的措施来确保程序的正确性和效率。在接下来的内容中,我们将详细解释这个过程,并提供一些实用的技巧和建议。
问题内容
考虑生成多个 goroutine 以从同一通道读取值。两个工作人员按预期生成,但只从通道中读取一项并停止读取。我期望 goroutine 继续从通道读取数据,直到将值发送到通道的 goroutine 关闭为止。尽管某些东西阻止了发送者发送,但生成项目的 goroutine 并未关闭。为什么每个工人只读取一个值并停止?
输出显示发送的两个值,每个工作 goroutine 各读取一个值。第三个值已发送,但未从任何一个工作线程中读取。
new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0
去游乐场
package main
import (
"fmt"
"sync"
)
func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
out := make(chan int)
var wg sync.WaitGroup
for i := 0; i < numberOfWorkers; i++ {
fmt.Println("new worker")
wg.Add(1)
// fan out worker goroutines reading from in channel and
// send output into out channel
go func() {
defer wg.Done()
for {
select {
case <-done:
fmt.Println("recieved done signal")
return
case data, ok := <-in:
if !ok {
fmt.Println("no more items")
return
}
// fan-in job execution multiplexing results into the results channel
fmt.Println("running func", data)
value := fn(data)
fmt.Println("sending value out", value)
out <- value
}
}
}()
}
fmt.Println("waiting")
wg.Wait()
fmt.Println("done waiting")
close(out)
return out
}
func main() {
done := make(chan bool)
defer close(done)
in := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println("sending", i)
in <- i
}
close(in)
}()
out := workerPool(done, in, 2, func(i int) int {
return i
})
for {
select {
case o, ok := <-out:
if !ok {
continue
}
fmt.Println("output", o)
case <-done:
return
default:
}
}
}
解决方法
之前关于通道未缓冲的评论是正确的,但还存在其他同步问题。
无缓冲通道本质上意味着写入值时,必须在发生任何其他写入之前接收该值。
workerpool
创建一个无缓冲通道out
来存储结果,但只有在所有结果写入 out 后才返回。但由于从 out 通道的读取发生在out
返回之后,并且out
没有缓冲,因此workerpool
在尝试写入时被阻塞,从而导致死锁。这就是为什么看起来每个工作人员只发送一个值;实际上,在发送第一个之后,所有工作人员都被阻止,因为没有任何东西可以接收该值(您可以通过在写入out
后移动 print 语句来看到这一点)
修复选项包括使 out
有一个大小为 n = 结果数
的缓冲区(即 out := make(chan int, n)
)或使 out
不缓冲并在写入时从 out
进行读取。
done
频道也没有被正确使用。main
和workerpool
都依赖它来停止执行,但没有任何内容被写入其中!它也是无缓冲的,因此也会遇到上述死锁问题。
要解决此问题,您首先可以从 workerpool
中删除 case <-done:
并简单地通过 in
进行范围,因为它在 main
中关闭。然后可以将done
设置为缓冲通道来解决死锁。
结合这些修复可以得到:
package main
import (
"fmt"
"sync"
)
func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
out := make(chan int, 100)
var wg sync.WaitGroup
for i := 0; i < numberOfWorkers; i++ {
fmt.Println("new worker")
wg.Add(1)
// fan out worker goroutines reading from in channel and
// send output into out channel
go func() {
defer wg.Done()
for data := range in {
// fan-in job execution multiplexing results into the results channel
fmt.Println("running func", data)
value := fn(data)
fmt.Println("sending value out", value)
out <- value
}
fmt.Println("no more items")
return
}()
}
fmt.Println("waiting")
wg.Wait()
fmt.Println("done waiting")
close(out)
done <- true
close(done)
return out
}
func main() {
done := make(chan bool, 1)
in := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println("sending", i)
in <- i
}
close(in)
}()
out := workerPool(done, in, 2, func(i int) int {
return i
})
for {
select {
case o, ok := <-out:
if !ok {
continue
}
fmt.Println("output", o)
case <-done:
return
}
}
}
这可以解决您的问题,但这不是使用频道的最佳方式!结构本身可以更改得更简单,而不必依赖缓冲通道。
以上就是多个 goroutine 从同一通道读取的详细内容,更多请关注编程网其它相关文章!