文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Gosingleflight使用以及原理

2023-01-02 12:00

关注

这个东西很重要,可以经常用在项目当中,所以我们单独拿出来进行讲解。

在使用它之前我们需要导包:

 go get golang.org/x/sync/singleflight

golang/sync/singleflight.Group 是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。

但是 golang/sync/singleflight.Group 能有效地解决这个问题,它能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。

使用方法

singleflight类的使用方法就新建一个singleflight.Group,使用其方法Do或者DoChan来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个协程执行结束后,拿到同样的结果。

Group结构体

代表一类工作,同一个group中,同样的key同时只能被执行一次

Do方法

func (g *Group) Do(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) (v interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, err error, shared bool)

key:同一个key,同时只有一个协程执行

fn:被包装的函数

v:返回值,即执行结果。其他等待的协程都会拿到

shared:表示是否由其他协程得到了这个结果v

DoChan方法

func (g *Group) DoChan(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) <-chan Result

Do差不多其实,因此我们就只讲解Do的实际应用场景了。

具体应用场景

var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
	log.Printf("request %s start to get and set cache...", r.URL)
	value, err, _ := singleSetCache.Do(cacheKey, func() (interface{}, error) {
		log.Printf("request %s is getting cache...", r.URL)
		time.Sleep(3 * time.Second)
		log.Printf("request %s get cache success!", r.URL)
		return cacheKey, nil
	})
	return value.(string), err
}
func main() {
	r := gin.Default()
	r.GET("/sekill/:id", func(context *gin.Context) {
		ID := context.Param("id")
		cache, err := GetAndSetCache(context.Request, ID)
		if err != nil {
			log.Println(err)
		}
		log.Printf("request %s get value: %v", context.Request.URL, cache)
	})
	r.Run()
}

来看一下执行结果:

2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:19 request /sekill/9 start to get and set cache...
2022/12/29 16:21:19 request /sekill/5 start to get and set cache...
2022/12/29 16:21:21 request /sekill/9 get cache success!
2022/12/29 16:21:21 request /sekill/5 get cache success!
2022/12/29 16:21:21 request /sekill/5 get value: 5
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    3.0106529s |       127.0.0.1 | GET      "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.8090881s |       127.0.0.1 | GET      "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.2166003s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.6064069s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.4178652s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.8101267s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    3.0116892s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.6074537s |       127.0.0.1 | GET      "/sekill/5"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.4076473s |       127.0.0.1 | GET      "/sekill/5"
[GIN] 2022/12/29 - 16:21:21 | 200 |     2.218686s |       127.0.0.1 | GET      "/sekill/5"

可以看到确实只有一个协程执行了被包装的函数,并且其他协程都拿到了结果。

接下来我们来看一下它的原理吧!

原理

首先来看一下Group结构体:

type Group struct {
   mu sync.Mutex  // 锁保证并发安全   
   m  map[string]*call //保存key对应的函数执行过程和结果的变量。
}

然后我们来看一下call结构体:

type call struct {
    wg sync.WaitGroup //用WaitGroup实现只有一个协程执行函数
    val interface{} //函数执行结果
    err error
    forgotten bool
    dups  int  //含义是duplications,即同时执行同一个key的协程数量
    chans []chan<- Result
}

然后我们来看一下Do方法:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    // 写Group的m字段时,加锁保证写安全
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
        // 如果key已经存在,说明已经由协程在执行,则dups++并等待其执行结果,执行结果保存在对应的call的val字段里
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true
	}
    // 如果key不存在,则新建一个call,并使用WaitGroup来阻塞其他协程,同时在m字段里写入key和对应的call
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
	g.doCall(c, key, fn) // 进来的第一个协程就来执行这个函数
	return c.val, c.err, c.dups > 0
}

然后我们来分析一下doCall函数:

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	c.val, c.err = fn()
	c.wg.Done()
	g.mu.Lock()
	delete(g.m, key)
	for _, ch := range c.chans {
		ch <- Result{c.val, c.err, c.dups > 0}
	}
	g.mu.Unlock()
}

问题分析

分析了源码之后,我们得出了一个结论,这个东西是用阻塞来实现的,这就引发了一个问题:如果我们处理的那个请求刚好遇到问题了,那么后面的所有请求都会被阻塞,也就是,我们应该加上适合的超时控制,如果在一定时间内,没有获得结果,那么就当作超时处理。

于是这个适合我们应该使用DoChan()。两者实现上完全一样,不同的是, DoChan() 通过 channel 返回结果。因此可以使用 select 语句实现超时控制。

var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
   log.Printf("request %s start to get and set cache...", r.URL)
   retChan := singleSetCache.DoChan(cacheKey, func() (interface{}, error) {
      log.Printf("request %s is getting cache...", r.URL)
      time.Sleep(3 * time.Second)
      log.Printf("request %s get cache success!", r.URL)
      return cacheKey, nil
   })
   var ret singleflight.Result
   timeout := time.After(2 * time.Second)
   select {
   case <-timeout:
      log.Println("time out!")
      return "", errors.New("time out")
   case ret = <-retChan: // 从chan中获取结果
      return ret.Val.(string), ret.Err
   }
}
func main() {
   r := gin.Default()
   r.GET("/sekill/:id", func(context *gin.Context) {
      ID := context.Param("id")
      cache, err := GetAndSetCache(context.Request, ID)
      if err != nil {
         log.Println(err)
      }
      log.Printf("request %s get value: %v", context.Request.URL, cache)
   })
   r.Run()
}

补充

这里其实还有一个Forget方法,它可以在映射表中删除某个键,接下来对键的调用就不会等待前面的函数返回了。

总结

当然,如果单次的失败无法容忍,在高并发的场景下更好的处理方案是:

放弃使用同步请求,牺牲数据更新的实时性

“缓存” 存储准实时的数据 + “异步更新” 数据到缓存

到此这篇关于Go singleflight使用以及原理的文章就介绍到这了,更多相关Go singleflight内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯