文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

GoLangchannel底层代码实现详解

2024-04-02 19:55

关注

1.channel 简介

Go语言有个很出名的话是“以通信的手段来共享内存”,channel就是其最佳的体现,channel提供一种机制,可以同步两个并发执行的函数,还可以让两个函数通过互相传递特定类型的值来通信

channel有两种初始化方式,分别是带缓存的和不带缓存的:

make(chan int)   // 无缓存 chan
make(chan int, 10)  // 有缓存 chan

使用方式也很简单:

c := make(chan int)
defer close(c)
go func(){
  c <- 5   // send
}()
n := <- c   // recv

十分简洁的做到了不同协程的交互。

2.channel 内部结构

chan的实现在runtime/chan.go,是一个hchan的结构体:

type hchan struct {
  qcount   uint           // 队列中的数据个数
  dataqsiz uint           // 环形队列的大小,channel本身是一个环形队列
  buf      unsafe.Pointer // 存放实际数据的指针,用unsafe.Pointer存放地址,为了避免gc
  elemsize uint16 
  closed   uint32 // 标识channel是否关闭
  elemtype *_type // 数据 元素类型
  sendx    uint   // send的 index
  recvx    uint   // recv 的 index
  recvq    waitq  // 阻塞在 recv 的队列
  sendq    waitq  // 阻塞在 send 的队列
  lock mutex  // 锁 
}

可以看出,channel本身是一个环形缓冲区,数据存放到堆上面,channel的同步是通过锁实现的,并不是想象中的lock-free的方式,channel中有两个队列,一个是发送阻塞队列,一个是接收阻塞队列。当向一个已满的channel发送数据会被阻塞,此时发送协程会被添加到sendq中,同理,当向一个空的channel接收数据时,接收协程也会被阻塞,被置入recvq中。

waitq是一个链表,里面对g结构做了一下简单的封装。

3.创建channel

当我们在代码里面通过make创建一个channel时,实际调用的是下面这个函数:

CALL runtime.makechan(SB)

makechan的实现如下所示:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem
  // 判断 元素类型的大小
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
  // 判断对齐限制
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }
  // 判断 size非负 和 是否大于 maxAlloc限制
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }
  var c *hchan
  switch {
  case mem == 0: // 无缓冲区,即 make没设置大小
    c = (*hchan)(mallocgc(hchanSize, nil, true)) 
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:  // 数据类型不包含指针
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:  // 如果包含指针
    // Elements contain pointers.
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }
  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)
  if debugChan {
    print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
  }
  return c
}

根据上面的代码,我们可以看到,创建channel分为三种情况:

1.第一种缓冲区大小为0,此时只需要分配hchansize大小的内存就ok

2.第二种缓冲区大小不为0,且channel的类型不包含指针,此时buf为hchanSize+元素大小*元素个数的连续内存

3.第三种缓冲区大小不为0,且channel的类型包含指针,则不能简单的根据元素的大小去申请内存,需要通过mallocgc去分配内存

4.发送数据

发送数据会调用chan.go中的如下接口:

CALL runtime.chansend1(SB)

chansend1会调用chansend接口,chansend方法签名如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

c是具体的channel,ep是发送的数据,block为true表示阻塞的发送,一般向channel发送数据都是阻塞的,如果channel数据满了,会一直阻塞在这里。但是在select中如果有case监听某个channel的发送,那么此时的block参数为false,后续分析select实现会讲到。

select {
    case <-c:  // 这里为非阻塞发送
        // do some thing
    default:
        // do some thing
}

chansend接口会对一些条件做判断

如果向一个为nil的channel发送数据,如果是阻塞发送会一直阻塞:

 if c == nil {
    if !block {
      return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }

首先会加锁,保证原子性,如果向一个已关闭的channel发送数据就会panic。

lock(&c.lock)
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }

如果此时recvq中有等待协程,就直接调用send函数将数据复制给接收方, 实现如下:

// sg 为接收者协程,ep为发送元素
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if raceenabled {
    if c.dataqsiz == 0 {
      racesync(c, sg)
    } else {
      qp := chanbuf(c, c.recvx)
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
        c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
  }
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

如果此时没有等待协程,并且数据未满的情况下,就将数据copy到环形缓冲区中,将位置后移一位。

if c.qcount < c.dataqsiz {  // 如果 未满
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }

如果此时环形缓冲区数据满了,如果是阻塞发送,此时会把发送方放到sendq队列中。

5.接收数据

接收数据会调用下面的接口:

CALL runtime.chanrecv1(SB)

chanrecv1会调用chanrecv接口,chanrecv方法签名如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

c 指需要操作的channel,接收的数据会写到ep中,block与send中的情况一样,表示是阻塞接收还是非阻塞接收,非阻塞接收指在select中case 接收一个channel值:

select {
    case a := <-c:  // 这里为非阻塞接收,没有数据直接返回
        // do some thing
    default:
        // do some thing
}

首先chanrecv也会做一些参数校验

如果channel为nil并且是非阻塞模式,直接返回,如果是阻塞模式,永远等待

 if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }

随后会加锁,防止竞争读写

lock(&c.lock)

如果向一个已关闭的channel接收数据,此时channel里面还有数据,那么依然可以接收数据,属于正常接收数据情况。

如果向一个已关闭的channel接收数据,此时channel里面没有数据,那么此时返回的是(true,false),表示有值返回,但不是我们需要的值:

 if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
      raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)  // 将 ep 指向的内存块置 0
    }
    return true, false
  }

接收也分为三种情况:

如果此时 sendq中有发送方在阻塞,此时会调用recv函数:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if raceenabled {
      racesync(c, sg)
    }
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
    }
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

此时有发送方在等待,表示此时channel中数据已满,这个时候会将channel头部的数据copy到接收方,然后将发送方队列头部的发送者的数据copy到那个位置。这涉及到两次copy操作。

第二种情况是如果没有发送方等待,此时会把数据copy到channel中:

if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }

第三种情况如果channel里面没有数据,如果是非阻塞接收直接返回false,如果是阻塞接收会将接收方协程放入channel的recvq中。

6.关闭channel

关闭channel时会调用如下接口:

func closechan(c *hchan) 

首先会做一些数据校验:

if c == nil {
    panic(plainError("close of nil channel"))
  }
  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
  if raceenabled {
    callerpc := getcallerpc()
    racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
    racerelease(c.raceaddr())
  }
  c.closed = 1   //置关闭标记位

如果向一个为nil的channel或者向一个已关闭的channel发起close操作就会panic。

随后会唤醒所有在recvq或者sendq里面的协程:

var glist gList
  // release all readers
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  // release all writers (they will panic)
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  unlock(&c.lock)

如果存在接收者,将接收数据通过typedmemclr置0。

如果存在发送者,将所有发送者panic。

7.总结

综上分析,在使用channel有这么几点要注意

1.确保所有数据发送完后再关闭channel,由发送方来关闭

2.不要重复关闭channel

3.不要向为nil的channel里面发送值

4.不要向为nil的channel里面接收值

5.接收数据时,可以通过返回值判断是否ok

n , ok := <- c
if ok{
  // do some thing
}

这样防止channel被关闭后返回了零值,对业务造成影响

到此这篇关于GoLang channel底层代码实现详解的文章就介绍到这了,更多相关GoLang channel内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯