文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

多个 goroutine 从同一通道读取

2024-02-09 16:16

关注

php小编草莓在本文中将为大家介绍多个goroutine从同一通道读取的相关内容。在并发编程中,goroutine是Go语言中的轻量级线程,可以同时执行多个任务。通道是goroutine之间进行通信的重要方式。当多个goroutine需要从同一个通道读取数据时,我们需要注意一些问题,并采取相应的措施来确保程序的正确性和效率。在接下来的内容中,我们将详细解释这个过程,并提供一些实用的技巧和建议。

问题内容

考虑生成多个 goroutine 以从同一通道读取值。两个工作人员按预期生成,但只从通道中读取一项并停止读取。我期望 goroutine 继续从通道读取数据,直到将值发送到通道的 goroutine 关闭为止。尽管某些东西阻止了发送者发送,但生成项目的 goroutine 并未关闭。为什么每个工人只读取一个值并停止?

输出显示发送的两个值,每个工作 goroutine 各读取一个值。第三个值已发送,但未从任何一个工作线程中读取。

new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0

去游乐场

package main

import (
    "fmt"
    "sync"
)

func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case <-done:
                    fmt.Println("recieved done signal")
                    return
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    return out
}

func main() {
    done := make(chan bool)
    defer close(done)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        default:
        }
    }

}

解决方法

之前关于通道未缓冲的评论是正确的,但还存在其他同步问题。

无缓冲通道本质上意味着写入值时,必须在发生任何其他写入之前接收该值。

  1. workerpool 创建一个无缓冲通道 out 来存储结果,但只有在所有结果写入 out 后才返回。但由于从 out 通道的读取发生在 out 返回之后,并且 out 没有缓冲,因此 workerpool 在尝试写入时被阻塞,从而导致死锁。这就是为什么看起来每个工作人员只发送一个值;实际上,在发送第一个之后,所有工作人员都被阻止,因为没有任何东西可以接收该值(您可以通过在写入 out 后移动 print 语句来看到这一点)

修复选项包括使 out 有一个大小为 n = 结果数 的缓冲区(即 out := make(chan int, n))或使 out 不缓冲并在写入时从 out 进行读取。

  • done 频道也没有被正确使用。 mainworkerpool 都依赖它来停止执行,但没有任何内容被写入其中!它也是无缓冲的,因此也会遇到上述死锁问题。
  • 要解决此问题,您首先可以从 workerpool 中删除 case <-done: 并简单地通过 in 进行范围,因为它在 main 中关闭。然后可以将done设置为缓冲通道来解决死锁。

    结合这些修复可以得到:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
        out := make(chan int, 100)
        var wg sync.WaitGroup
    
        for i := 0; i < numberOfWorkers; i++ {
            fmt.Println("new worker")
            wg.Add(1)
            // fan out worker goroutines reading from in channel and
            // send output into out channel
            go func() {
                defer wg.Done()
                for data := range in {
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
    
                }
                fmt.Println("no more items")
                return
            }()
        }
    
        fmt.Println("waiting")
        wg.Wait()
        fmt.Println("done waiting")
        close(out)
        done <- true
        close(done)
        return out
    }
    
    func main() {
        done := make(chan bool, 1)
    
        in := make(chan int)
    
        go func() {
            for i := 0; i < 10; i++ {
                fmt.Println("sending", i)
                in <- i
            }
            close(in)
        }()
    
        out := workerPool(done, in, 2, func(i int) int {
            return i
        })
    
        for {
            select {
            case o, ok := <-out:
                if !ok {
                    continue
                }
    
                fmt.Println("output", o)
            case <-done:
                return
            }
        }
    
    }

    这可以解决您的问题,但这不是使用频道的最佳方式!结构本身可以更改得更简单,而不必依赖缓冲通道。

    以上就是多个 goroutine 从同一通道读取的详细内容,更多请关注编程网其它相关文章!

    阅读原文内容投诉

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    软考中级精品资料免费领

    • 历年真题答案解析
    • 备考技巧名师总结
    • 高频考点精准押题
    • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

      难度     813人已做
      查看
    • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

      难度     354人已做
      查看
    • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

      难度     318人已做
      查看
    • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

      难度     435人已做
      查看
    • 2024年上半年系统架构设计师考试综合知识真题

      难度     224人已做
      查看

    相关文章

    发现更多好内容

    猜你喜欢

    AI推送时光机
    位置:首页-资讯-后端开发
    咦!没有更多了?去看看其它编程学习网 内容吧
    首页课程
    资料下载
    问答资讯