文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Golang channel如何应用

2023-07-04 11:02

关注

这篇文章主要介绍“Golang channel如何应用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Golang channel如何应用”文章能帮助大家解决问题。

前言

channel是用于 goroutine 之间的同步、通信的数据结构

channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

整体结构

Go channel的数据结构如下所示:

type hchan struct {    qcount   uint           // total data in the queue    dataqsiz uint           // size of the circular queue    buf      unsafe.Pointer // points to an array of dataqsiz elements    elemsize uint16    closed   uint32    elemtype *_type // element type    sendx    uint   // send index    recvx    uint   // receive index    recvq    waitq  // list of recv waiters    sendq    waitq  // list of send waiters    lock mutex}

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把锁来保护

创建

创建channel会被编译器编译为调用makechan函数

// 无缓冲通道ch2 := make(chan int)// 有缓冲通道ch3 := make(chan int, 10)

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针

func makechan(t *chantype, size int) *hchan {   elem := t.elem       // mem:缓冲区大小    mem, overflow := math.MulUintptr(elem.size, uintptr(size))   if overflow || mem > maxAlloc-hchanSize || size < 0 {      panic(plainError( "makechan: size out of range" ))   }   var c *hchan   switch {   // 缓冲区大小为空,只申请hchanSize大小的内存   case mem == 0:       c = (*hchan)(mallocgc(hchanSize, nil, true))       c.buf = c.raceaddr()   // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存   case elem.ptrdata == 0:       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))       c.buf = add(unsafe.Pointer(c), hchanSize)   // 否则就是带缓存,且有指针,分配两次内存   default:      // Elements contain pointers.       c = new(hchan)       c.buf = mallocgc(mem, elem, true)   }       // 保存元素类型,元素大小,容量   c.elemsize = uint16(elem.size)   c.elemtype = elem   c.dataqsiz = uint(size)   lockInit(&c.lock, lockRankHchan)      return c}

发送

执行以下代码时:

ch <- 3

编译器会转化为对chansend的调用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {   // 如果channel是空   if c == nil {      // 非阻塞,直接返回      if !block {         return  false      }      // 否则阻塞当前协程      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)      throw( "unreachable" )   }   // 非阻塞,没有关闭,且容量满了,无法发送,直接返回   if !block && c.closed == 0 && full(c) {      return  false   }   // 加锁   lock(&c.lock)   // 如果已经关闭,无法发送,直接panic   if c.closed != 0 {      unlock(&c.lock)      panic(plainError( "send on closed channel" ))   }   // 从接收队列弹出一个协程的包装结构sudog   if sg := c.recvq.dequeue(); sg != nil {      // 如果能弹出,即有等到接收的协程,说明:      // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待      // 将要发送的数据拷贝到该协程的接收指针上      send(c, sg, ep, func() { unlock(&c.lock) }, 3)      return  true}   // 缓冲区还有空间   if c.qcount < c.dataqsiz {      // qp:计算要发送到的位置的地址      qp := chanbuf(c, c.sendx)      // 将数据从ep拷贝到qp      typedmemmove(c.elemtype, qp, ep)      // 待发送位置移动      c.sendx++      // 由于是数组模拟队列,sendx到顶了需要归零      if c.sendx == c.dataqsiz {         c.sendx = 0      }      // 缓冲区数量++      c.qcount++      unlock(&c.lock)      return  true}   // 往下就是缓冲区无数据,也没有等到接收协程的情况了      // 如果是非阻塞模式,直接返回   if !block {      unlock(&c.lock)      return  false    }   // 将当前协程包装成sudog,阻塞到channel上   gp := getg()   mysg := acquireSudog()   mysg.releasetime = 0   if t0 != 0 {      mysg.releasetime = -1   }     mysg.elem = ep   mysg.waitlink = nil   mysg.g = gp   mysg.isSelect = false   mysg.c = c   gp.waiting = mysg   gp.param = nil      // 当前协程进入发送等待队列   c.sendq.enqueue(mysg)   atomic.Store8(&gp.parkingOnChan, 1)   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)    // 被唤醒后从这里开始执行      KeepAlive(ep)   if mysg != gp.waiting {      throw( "G waiting list is corrupted" )   }   gp.waiting = nil   gp.activeStackChans = false   closed := !mysg.success   gp.param = nil   if mysg.releasetime > 0 {      blockevent(mysg.releasetime-t0, 2)   }   mysg.c = nil   releaseSudog(mysg)   // 被唤醒后发现channel关闭了,panic   if closed {      if c.closed == 0 {         throw( "chansend: spurious wakeup" )      }      panic(plainError( "send on closed channel" ))   }   return  true}

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

type sudog struct {   // 协程   g *g   // 前一个,后一个指针   next *sudog   prev *sudog   // 等到发送的数据在哪,等待从哪个位置接收数据   elem unsafe.Pointer   acquiretime int64   releasetime int64   ticket      uint32   isSelect bool   success bool   parent   *sudog // semaRoot binary tree   waitlink *sudog // g.waiting list or semaRoot   waittail *sudog // semaRoot   // 在哪个channel上等待   c        *hchan // channel}

其目的是:

来看看一些子函数:

判断channel是否是满的

func full(c *hchan) bool {   // 无缓冲   if c.dataqsiz == 0 {      // 并且没有其他协程在等待      return c.recvq.first == nil   }   // 有缓冲,但容量装满了   return c.qcount == c.dataqsiz}

send方法:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {   // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem   if sg.elem != nil {      sendDirect(c.elemtype, sg, ep)      sg.elem = nil   }   gp := sg.g   unlockf()   gp.param = unsafe.Pointer(sg)   sg.success = true   if sg.releasetime != 0 {      sg.releasetime = cputicks()   }   // 唤醒该接收者协程   goready(gp, skip+1)}

接收

从channel中接收数据有几种写法:

根据带不带ok,决定用下面哪个方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {        chanrecv(c, elem, true)}func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {        _, received = chanrecv(c, elem, true)        return}

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞   if c == nil {      if !block {         return   }      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)      throw( "unreachable" )   }   // 非阻塞,并且channel为空   if !block && empty(c) {      // 如果还没关闭,直接返回   if atomic.Load(&c.closed) == 0 {      return   }      // 否则已经关闭,      // 如果为空,返回该类型的零值   if empty(c) {     if ep != nil {        typedmemclr(c.elemtype, ep)     }     return  true, false       }   }   lock(&c.lock)      // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值   if c.closed != 0 && c.qcount == 0 {      unlock(&c.lock)      if ep != nil {         typedmemclr(c.elemtype, ep)      }      return  true, false}       // 如果有发送者正在阻塞,说明:   // 1.无缓冲   // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待   if sg := c.sendq.dequeue(); sg != nil {      // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)      return  true, true}       // 如果缓存区有数据,    if c.qcount > 0 {      // qp为缓冲区中下一次接收的位置      qp := chanbuf(c, c.recvx)      // 将数据从qp拷贝到ep      if ep != nil {         typedmemmove(c.elemtype, ep, qp)      }      typedmemclr(c.elemtype, qp)      c.recvx++      if c.recvx == c.dataqsiz {         c.recvx = 0      }      c.qcount--      unlock(&c.lock)      return  true, true}   // 接下来就是既没有发送者在等待,也缓冲区也没数据   if !block {      unlock(&c.lock)      return  false, false}   // 将当前协程包装成sudog,阻塞到channel中   gp := getg()   mysg := acquireSudog()   mysg.releasetime = 0   if t0 != 0 {      mysg.releasetime = -1   }   // 记录接收地址   mysg.elem = ep   mysg.waitlink = nil   gp.waiting = mysg   mysg.g = gp   mysg.isSelect = false   mysg.c = c   gp.param = nil   c.recvq.enqueue(mysg)   atomic.Store8(&gp.parkingOnChan, 1)   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)   // 从这里唤醒   if mysg != gp.waiting {      throw( "G waiting list is corrupted" )   }   gp.waiting = nil   gp.activeStackChans = false   if mysg.releasetime > 0 {      blockevent(mysg.releasetime-t0, 2)   }   success := mysg.success   gp.param = nil   mysg.c = nil   releaseSudog(mysg)   return  true, success}

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

如果缓存区有数据, 则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {   // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep   if c.dataqsiz == 0 {      if ep != nil {         recvDirect(c.elemtype, sg, ep)      }   // 接下来是有缓冲,且缓冲区满的情况      } else {      // qp为channel缓冲区中,接收者下一次接收的地址   qp := chanbuf(c, c.recvx)      // 将数据从qp拷贝到ep   if ep != nil {         typedmemmove(c.elemtype, ep, qp)    }    // 将发送者的数据从sg.elem拷贝到qp    typedmemmove(c.elemtype, qp, sg.elem)    c.recvx++    if c.recvx == c.dataqsiz {       c.recvx = 0    }    // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx    c.sendx = c.recvx }   sg.elem = nil   gp := sg.g   unlockf()   gp.param = unsafe.Pointer(sg)   sg.success = true   if sg.releasetime != 0 {      sg.releasetime = cputicks()   }   // 唤醒发送者   goready(gp, skip+1)}

关闭

func closechan(c *hchan) {   // 不能关闭空channel   if c == nil {      panic(plainError( "close of nil channel" ))   }   lock(&c.lock)   // 不能重复关闭   if c.closed != 0 {      unlock(&c.lock)      panic(plainError( "close of closed channel" ))   }   // 修改关闭状态   c.closed = 1   var glist gList   // 释放所有的接收者协程,并为它们赋予零值 for {      sg := c.recvq.dequeue()      if sg == nil {         break      }      if sg.elem != nil {         typedmemclr(c.elemtype, sg.elem)         sg.elem = nil      }      if sg.releasetime != 0 {         sg.releasetime = cputicks()      }      gp := sg.g      gp.param = unsafe.Pointer(sg)      sg.success = false      glist.push(gp)   }   // 释放所有的发送者协程 for {      sg := c.sendq.dequeue()      if sg == nil {         break     }      sg.elem = nil      if sg.releasetime != 0 {         sg.releasetime = cputicks()      }      gp := sg.g      gp.param = unsafe.Pointer(sg)      sg.success = false      glist.push(gp)   }   unlock(&c.lock)   // 执行唤醒操作 for !glist.empty() {      gp := glist.pop()      gp.schedlink = 0      goready(gp, 3)   }}

关闭的流程比较简单,可以看出:

不能关闭空channel,不能重复关闭channel

先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:

接收者:会收到该类型的零值

这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑

发送者:会被唤醒,然后panic

因此不能在有多个sender的时候贸然关闭channel

关于“Golang channel如何应用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯