文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Go如何创建Grpc链接池

2023-07-05 08:55

关注

这篇“Go如何创建Grpc链接池”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Go如何创建Grpc链接池”文章吧。

常规用法

gRPC 四种基本使用

常见的gRPC调用写法

func main(){//... some code// 链接grpc服务conn , err := grpc.Dial(":8000",grpc.WithInsecure)if err != nil {//...log}defer conn.Close()//...some code

存在的问题:面临高并发的情况,性能问题很容易就会出现,例如我们在做性能测试的时候,就会发现,打一会性能测试,客户端请求服务端的时候就会报错:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

实际去查看问题的时候,很明显,这是 gRPC 的连接数被打满了,很多连接都还未完全释放。[#本文来源:janrs.com#]

gRPC 的通信本质上也是 TCP 的连接,那么一次连接就需要三次握手,和四次挥手,每一次建立连接和释放连接的时候,都需要走这么一个过程,如果我们频繁的建立和释放连接,这对于资源和性能其实都是一个大大的浪费。

在服务端,gRPC 服务端的链接管理不用我们操心,但是 gRPC 客户端的链接管理非常有必要关心,要实现复用客户端的连接。

创建链接池

创建链接池需要考虑的问题:

创建链接池接口

type Pool interface {// 获取一个新的连接 , 当关闭连接的时候,会将该连接放入到池子中   Get() (Conn, error)// 关闭连接池,自然连接池子中的连接也不再可用   Close() error//[#本文来源:janrs.com#]   Status() string}

实现链接池接口

创建链接池代码

func New(address string, option Options) (Pool, error) {   if address == "" {      return nil, errors.New("invalid address settings")   }   if option.Dial == nil {      return nil, errors.New("invalid dial settings")   }   if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {      return nil, errors.New("invalid maximum settings")   }   if option.MaxConcurrentStreams <= 0 {      return nil, errors.New("invalid maximun settings")   }   p := &pool{      index:   0,      current: int32(option.MaxIdle),      ref:     0,      opt:     option,      conns:   make([]*conn, option.MaxActive),      address: address,      closed:  0,   }   for i := 0; i < p.opt.MaxIdle; i++ {      c, err := p.opt.Dial(address)      if err != nil {         p.Close()         return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)      }      p.conns[i] = p.wrapConn(c, false)   }   log.Printf("new pool success: %v\n", p.Status())   return p, nil}

关于以上的代码,需要特别注意每一个连接的建立也是在 New 里面完成的,[#本文来源:janrs.com#]只要有 1 个连接未建立成功,那么咱们的连接池就算是建立失败,咱们会调用 p.Close() 将之前建立好的连接全部释放掉。

关闭链接池代码

// 关闭连接池func (p *pool) Close() error {   atomic.StoreInt32(&p.closed, 1)   atomic.StoreUint32(&p.index, 0)   atomic.StoreInt32(&p.current, 0)   atomic.StoreInt32(&p.ref, 0)   p.deleteFrom(0)   log.Printf("[janrs.com]close pool success: %v\n", p.Status())   return nil}

从具体位置删除链接池代码

// 清除从 指定位置开始到 MaxActive 之间的连接func (p *pool) deleteFrom(begin int) {   for i := begin; i < p.opt.MaxActive; i++ {      p.reset(i)   }}

销毁具体的链接代码

// 清除具体的连接func (p *pool) reset(index int) {   conn := p.conns[index]   if conn == nil {      return   }   conn.reset()   p.conns[index] = nil}

关闭链接

代码

func (c *conn) reset() error {   cc := c.cc   c.cc = nil   c.once = false   // 本文博客来源:janrs.com   if cc != nil {      return cc.Close()   }   return nil}func (c *conn) Close() error {   c.pool.decrRef()   if c.once {      return c.reset()   }   return nil}

在使用连接池通过 pool.Get() 拿到具体的连接句柄 conn 之后,会使用 conn.Close()关闭连接,实际上也是会走到上述的 Close() 实现的位置,但是并未指定当然也没有权限显示的指定将 once 置位为 false ,也就是对于调用者来说,是关闭了连接,对于连接池来说,实际上是将连接归还到连接池中。

扩缩容

关键代码

func (p *pool) Get() (Conn, error) {   // the first selected from the created connections   nextRef := p.incrRef()   p.RLock()   current := atomic.LoadInt32(&p.current)   p.RUnlock()   if current == 0 {      return nil, ErrClosed   }   if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {      next := atomic.AddUint32(&p.index, 1) % uint32(current)      return p.conns[next], nil   }   // 本文博客来源:janrs.com   // the number connection of pool is reach to max active   if current == int32(p.opt.MaxActive) {      // the second if reuse is true, select from pool's connections      if p.opt.Reuse {         next := atomic.AddUint32(&p.index, 1) % uint32(current)         return p.conns[next], nil      }      // the third create one-time connection      c, err := p.opt.Dial(p.address)      return p.wrapConn(c, true), err   }   // the fourth create new connections given back to pool   p.Lock()   current = atomic.LoadInt32(&p.current)   if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {      // 2 times the incremental or the remain incremental  ##janrs.com      increment := current      if current+increment > int32(p.opt.MaxActive) {         increment = int32(p.opt.MaxActive) - current      }      var i int32      var err error      for i = 0; i < increment; i++ {         c, er := p.opt.Dial(p.address)         if er != nil {            err = er            break         }         p.reset(int(current + i))         p.conns[current+i] = p.wrapConn(c, false)      }  // 本文博客来源:janrs.com      current += i      log.Printf("#janrs.com#grow pool: %d ---> %d, increment: %d, maxActive: %d\n",         p.current, current, increment, p.opt.MaxActive)      atomic.StoreInt32(&p.current, current)      if err != nil {         p.Unlock()         return nil, err      }   }   p.Unlock()   next := atomic.AddUint32(&p.index, 1) % uint32(current)   return p.conns[next], nil}

Get 代码逻辑

也可以在 Get 的实现上进行缩容,具体的缩容策略可以根据实际情况来定,例如当引用计数 nextRef 只有当前活跃连接数的 10% 的时候(这只是一个例子),就可以考虑缩容了。

以上就是关于“Go如何创建Grpc链接池”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯