开篇
在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema:
type Mutex struct {
state int32
sema uint32
}
- state 代表互斥锁的状态,比如是否被锁定;
- sema 表示信号量,协程阻塞会等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。
这个 sema 就是 semaphore 信号量的意思。Golang 协程之间的抢锁,实际上争抢给Locked
赋值的权利,能给 Locked
置为1,就说明抢锁成功。抢不到就阻塞等待 sema
信号量,一旦持有锁的协程解锁,那么等待的协程会依次被唤醒。
有意思的是,虽然 semaphore 在锁的实现中起到了至关重要的作用,Golang 对信号量的实现却是隐藏在 runtime 中,并没有包含到标准库里来,在 src 源码中我们可以看到底层依赖的信号量相关函数。
// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
- runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻将 s 减去 1【原子操作】;
- runtime_Semrelease:将 s 增加 1,然后通知一个阻塞在 runtime_Semacquire 的 goroutine【原子操作】。
两个原子操作,一个 acquire,一个 release,其实就代表了对资源的获取和释放。Mutex 作为 sync 包的核心,支撑了 RWMutex,channel,singleflight 等多个并发控制的能力,而对信号量的管理又是 Mutex 的基础。
虽然源码看不到,但 Golang 其实在扩展库 golang.org/x/sync/semaphore
也提供了一套信号量的实现,我们可以由此来参考一下,理解 semaphore 的实现思路。
信号量
在看源码之前,我们先理清楚【信号量】设计背后的场景和原理。
信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。
在 Mutex 依赖的信号量机制中我们可以看到,这里本质就是依赖 sema 一个 uint32 的变量 + 原子操作来实现并发控制能力。当 goroutine 完成对信号量等待时,该变量 -1,当 goroutine 完成对信号量的释放时,该变量 +1。
如果一个新的 goroutine 发现信号量不大于 0,说明资源暂时没有,就得阻塞等待。直到信号量 > 0,此时的语义是有新的资源,该goroutine就会结束等待,完成对信号量的 -1 并返回。注意我们上面有提到,runtime 支持的两个方法都是原子性的,不用担心两个同时在等待的 goroutine 同时抢占同一份资源。
典型的信号量场景是【图书馆借书】。假设学校图书馆某热门书籍现在只有 100 本存货,但是上万学生都想借阅,怎么办?
直接买一万本书是非常简单粗暴的解法,但资源有限,这不是长久之计。
常见的解决方案很简单:学生们先登记,一个一个来。我们先给 100 个同学发出,剩下的你们继续等,等到什么时候借书的同学看完了,把书还回来了,就给排队等待的同学们发放。同时,为了避免超发,每发一个,都需要在维护的记录里将【余量】减去 1,每还回来一个,就把【余量】加上 1。
runtime_Semacquire 就是排队等待借书,runtime_Semrelease 就是看完了把书归还给图书馆。
另外需要注意,虽然我们上面举例的增加/减小的粒度都是 1,但这本质上只是一种场景,事实上就算是图书馆借书,也完全有可能出现一个人同时借了两本一模一样的书。所以,信号量的设计需要支持 N 个资源的获取和释放。
所以,我们对于 acquire 和 release 两种操作的语义如下:
- release: 将信号量增加 n【保证原子性】;
- acquire: 若信号量 < n,阻塞等待,直到信号量 >= n,此时将信号量的值减去 n【保证原子性】。
semaphore 扩展库实现
这里我们结合golang.org/x/sync/semaphore
源码来看看怎样设计出来我们上面提到的信号量结构。
// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64 // 最大资源数
cur int64 // 当前已被使用的资源
mu sync.Mutex
waiters list.List // 等待队列
}
有意思的是,虽然包名是 semaphore,但是扩展库里真正给【信号量结构体】定义的名称是 Weighted。从上面的定义我们可以看到,传入初始资源个数 n(对应 size),就可以生成一个 Weighted 信号量结构。
Weighted 提供了三个方法来实现对信号量机制的支持:
- Acquire
对应上面我们提到的 acquire 语义,注意我们提到过,抽象的来讲,acquire 成功与否其实不太看返回值,而是只要获取不了就一直阻塞,如果返回了,就意味着获取到了。
但在 Golang 实现当中,我们肯定不希望,如果发生了异常 case,导致一直阻塞在这里。所以你可以看到 Acquire 的入参里有个 context.Context,借用 context 的上下文控制能力,你可以对此进行 cancel, 可以设置 timeout 等待超时,就能对 acquire 行为进行更多约束。
所以,acquire 之后我们仍然需要检查返回值 error,如果为 nil,代表正常获取了资源。否则可能是 context 已经不合法了。
- Release
跟上面提到的 release 语义完全一致,传入你要释放的资源数 n,保证原子性地增加信号量。
- TryAcquire
这里其实跟 sync 包中的各类 TryXXX 函数定位很像。并发的机制中大都包含 fast path 和 slow path,比如首个 goroutine 先来 acquire,那么一定是能拿到的,后续再来请求的 goroutine 由于慢了一步,只能走 slow path 进行等待,自旋等操作。sync 包中绝大部分精华,都在于 slow path 的处理。fast path 大多是一个基于 atomic 包的原子操作,比如 CAS 就可以解决。
TryAcquire 跟 Acquire 的区别在于,虽然也是要资源,但是不等待。有了我就获取,就减信号量,返回 trye。但是如果目前还没有,我不会阻塞在这里,而是直接返回 false。
下面我们逐个方法看看,Weighted 是怎样实现的。
Acquire
// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// If we're at the front and there're extra tokens left, notify other waiters.
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
在阅读之前回忆一下上面 Weighted 结构的定义,注意 Weighted 并没有维护一个变量用来表示【当前剩余的资源】,这一点是通过 size(初始化的时候设置,表示总资源数)减去 cur(当前已被使用的资源),二者作差得到的。
我们来拆解一下上面这段代码:
第一步:这是常规意义上的 fast path
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
- 先上锁,保证并发安全;
- 校验如果 size - cur >= n,代表剩余的资源是足够,同时 waiters 这个等待队列为空,代表没有别的协程在等待;
- 此时就没什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 个资源,然后解锁返回,很直接。
第二步:针对特定场景做提前剪枝
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
如果请求的资源数量,甚至都大于资源总数量了,说明这个协程心里没数。。。。就算我现在把所有初始化的资源都拿回来,也喂不饱你呀!!!那能怎么办,我就不烦劳后面流程处理了,直接等你的 context 什么时候 Done,给你返回 context 的错误就行了,同时先解个锁,别耽误别的 goroutine 拿资源。
第三步:资源是够的,只是现在没有,那就把当前goroutine加到排队的队伍里
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
这里 ready 结构是个空结构体的 channel,仅仅是为了实现协程间通信,通知什么时候资源 ready,建立一个属于这个 goroutine 的 waiter,然后塞到 Weighted 结构的等待队列 waiters 里。
搞定了以后直接解锁,因为你已经来排队了,手续处理完成,以后的路有别的通知机制保证,就没必要在这儿拿着锁阻塞新来的 goroutine 了,人家也得排队。
第四步:排队等待
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// If we're at the front and there're extra tokens left, notify other waiters.
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
一个 select 语句,只看两种情况:1. 这个 goroutine 的 context 超时了;2. 拿到了资源,皆大欢喜。
重点在于 ctx.Done 分支里 default 的处理。我们可以看到,如果是超时了,此时还没拿到资源,首先会把当前 goroutine 从 waiters 等待队列里移除(合情合理,你既然因为自己的原因做不了主,没法继续等待了,就别耽误别人事了)。
然后接着判断,若这个 goroutine 同时也是排在最前的 goroutine,而且恰好现在有资源了,就赶紧通知队里的 goroutine 们,伙计们,现在有资源了,赶紧来拿。我们来看看这个 notifyWaiters 干了什么:
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}
其实很简单,遍历 waiters 这个等待队列,拿到排队最前的 waiter,判断资源够不够,如果够了,增加 cur 变量,资源给你,然后把你从等待队列里移出去,再 close ready 那个goroutine 就行,算是通知一下。
重点部分在于,如果资源不够怎么办?
想象一下现在的处境,Weighted 这个 semaphore 的确有资源,而目前要处理的这个 goroutine 的的确确就是排队最靠前的,而且人家也没狮子大开口,要比你总 size 还大的资源。但是,但是,好巧不巧,现在你要的数量,比我手上有的少。。。。
很无奈,那怎么办呢?
无非两种解法:
- 我先不管你,反正你要的不够,我先看看你后面那个 goroutine 人家够不够,虽然你现在是排位第一个,但是也得继续等着;
- 没办法,你排第一,需求我就得满足,所以我们都继续等,等啥时候资源够了就给你。
扩展库实际选用的是第 2 种策略,即一定要满足排在最前面的 goroutine,这里的考虑在注释里有提到,如果直接继续看后面的 goroutine 够不够,优先满足后面的,在一些情况下会饿死有大资源要求的 goroutine,设计上不希望这样的情况发生。
简单说:要的多不是错,既然你排第一,目前货不多,那就大家一起阻塞等待,保障你的权利。
Release
// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
Release 这里的实现非常简单,一把锁保障不出现并发,然后将 cur 减去 n 即可,说明此时又有 n 个资源回到了货仓。然后和上面 Acquire 一样,调用 notifyWaiters,叫排队第一的哥们(哦不,是 goroutine)来领东西了。
TryAcquire
// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
其实就是 Acquire 方法的 fast path,只是返回了个 bool,标识是否获取成功。
总结
今天我们了解了扩展库 semaphore 对于信号量的封装实现,整体代码加上注释也才 100 多行,是非常好的学习材料,建议大家有空了对着源码再过一遍。Acquire 和 Release 的实现都很符合直觉。
其实,我们使用 buffered channel 其实也可以模拟出来 n 个信号量的效果,但就不具备 semaphore Weighted 这套实现里面,一次获取多个资源的能力了。
以上就是Golang信号量设计实现示例详解的详细内容,更多关于Go信号量设计的资料请关注编程网其它相关文章!