Go语言在设计时即以解决传统并发问题为目标,融入了CSP(Communicating Sequential Processes,通信顺序进程)模型的理念。CSP模型致力于简化并发编程,目标是让编写并发程序的难度与顺序程序相当。
在CSP模型中,通信和同步通过一种特定的流程实现:生产者产生数据,然后通过输出数据到输入/输出原语,最终到达消费者。Go语言为实现CSP模型,特别引入了Channel机制。Goroutine可以通过Channel进行数据的读写操作,Channel作为连接多个Goroutine的通信桥梁,简化了并发编程的复杂性。
虽然CSP模型在Go语言中占据主流地位,但Go同样支持基于共享内存的并发模型。在Go的sync
包中,提供了包括互斥锁、读写锁、条件变量和原子操作等在内的多种同步机制,以满足不同并发场景下的需求。
互斥锁(Mutex)
基本概念
互斥锁(Mutex)是一种用于在并发环境中安全访问共享资源的机制。当一个协程获取到锁时,它将拥有临界区的访问权,而其他请求该锁的协程将会阻塞,直到该锁被释放。
应用场景
并发访问共享资源的情形非常普遍,例如:
- 秒杀系统
- 多个goroutine并发修改某个变量
- 同时更新用户信息
如果没有互斥锁的控制,将会导致商品超卖、变量数值不正确、用户信息更新错误等问题。这时候就需要使用互斥锁来控制并发访问。
基本用法
Mutex实现了Locker接口,提供了两个方法:Lock
和Unlock
。
- Lock方法用于对临界区上锁,获得该锁的协程拥有临界资源的访问权,其他请求临界区的协程会阻塞等待该锁的释放。
- Unlock方法用于解锁,释放锁使其他协程可以访问临界区。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
var count int
increment := func() {
mu.Lock()
defer mu.Unlock()
count++
fmt.Println("Count:", count)
}
for i := 0; i < 5; i++ {
go increment()
}
time.Sleep(time.Second)
}
易错场景
- 不可重入的互斥锁:Go的Mutex是不可重入锁。由于Mutex锁没有记录锁的持有者信息,无法得知谁拥有锁。如果一个获取了锁的协程再次请求锁,将会被阻塞,形成死锁。
func example() {
var mu sync.Mutex
mu.Lock()
defer mu.Unlock()
// Do something...
mu.Lock() // 死锁
}
- Lock和Unlock不配对:未正确配对的Lock和Unlock调用会导致死锁。如果对已经锁定的锁再次调用Lock,将会阻塞;对未锁定的Mutex调用Unlock将会panic。
func example() {
var mu sync.Mutex
mu.Lock()
// 未调用mu.Unlock()
mu.Unlock() // 正确
}
- 复制已使用的锁:复制已使用的锁会导致意外的行为。
func example() {
var mu sync.Mutex
copyMu := mu
copyMu.Lock() // 错误
}
基本实现
Mutex结构体有两个字段:state和sema。
type Mutex struct {
state int32
sema uint32
}
State字段的含义
Mutex有以下四种状态。
- mutexLocked:Mutex上锁标志
- mutexWoken:Mutex唤醒标志
- mutexStarving:Mutex正常/饥饿模式标志
- waiterCount:等待者数量
Mutex的正常模式和饥饿模式
- 正常模式:等待者队列遵循先入先出原则,被唤醒的goroutine不会直接获得锁,而是与新请求锁的goroutine竞争锁。新请求锁的goroutine由于正在CPU上执行,获得锁的几率更大,从而减少上下文切换的性能损失。然而,这可能导致被唤醒的goroutine长时间无法获得锁。
- 饥饿模式:当等待时间超过阈值1毫秒时,进入饥饿模式。被唤醒的goroutine被放入等待队列的队首,当前goroutine在调用Unlock释放锁时,会直接将锁交给等待队列的队首,新请求锁的goroutine不会参与竞争,而是排到等待队列的队尾。当等待队列没有goroutine或等待时间小于1毫秒时,Mutex将从饥饿模式切换回正常模式。
代码示例
以下代码展示了如何使用Mutex在并发环境中安全地访问共享资源:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
var count int
increment := func() {
mu.Lock()
defer mu.Unlock()
count++
fmt.Println("Count:", count)
}
for i := 0; i < 5; i++ {
go increment()
}
time.Sleep(time.Second)
}
在上述代码中,多个goroutine同时调用increment函数,通过Mutex来确保对共享变量count的访问是安全的。
读写锁(RWMutex)
基本概念
在并发编程中,为了保证多个协程安全地访问共享资源,我们通常使用Mutex互斥锁。然而,在读多写少的场景下,Mutex会导致性能问题,因为所有操作(包括读操作)都必须串行进行。为了解决这一问题,可以区分读操作和写操作。RWMutex是一种读写锁,同一时间只能被一个写操作持有,或者被多个读操作持有。
基本用法
RWMutex提供了五个方法:Lock、Unlock、RLock、RUnlock和RLocker。
- Lock方法用于在写操作时获取写锁,会阻塞等待当前未释放的写锁。当处于写锁状态时,新的读操作将会阻塞等待。
- Unlock方法用于释放写锁。
- RLock方法用于在读操作时获取读锁,会阻塞等待当前写锁的释放。如果锁处于读锁状态,当前协程也能获取读锁。
- RUnlock方法用于释放读锁。
- RLocker方法用于获取一个Locker接口的对象,调用其Lock方法时会调用RLock方法,调用Unlock方法时会调用RUnlock方法。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var rw sync.RWMutex
var count int
write := func() {
rw.Lock()
defer rw.Unlock()
count++
fmt.Println("Write:", count)
}
read := func() {
rw.RLock()
defer rw.RUnlock()
fmt.Println("Read:", count)
}
// Start multiple readers
for i := 0; i < 5; i++ {
go read()
}
// Start a single writer
go write()
time.Sleep(time.Second)
}
实现原理
RWMutex主要通过readerCount字段来维护读锁的数量。写操作时,会将readerCount减去2的30次方变成一个负数,从而阻塞新的读锁请求。当写锁被释放时,将readerCount加上2的30次方,恢复成一个整数并唤醒等待中的读锁操作。
易错场景
RWMutex的易错场景和Mutex类似,包括以下几点。
- 不可重入锁:Go的RWMutex是不可重入锁。如果一个获取了锁的协程再次请求同一个锁,将会被阻塞,形成死锁。
func example() {
var rw sync.RWMutex
rw.Lock()
defer rw.Unlock()
// Do something...
rw.Lock() // 死锁
}
- Lock和Unlock不配对:未正确配对的Lock和Unlock调用会导致死锁。如果对已经锁定的锁再次调用Lock,将会阻塞;对未锁定的RWMutex调用Unlock将会panic。
func example() {
var rw sync.RWMutex
rw.Lock()
// 未调用rw.Unlock()
rw.Unlock() // 正确
}
- 复制已使用的锁:复制已使用的锁会导致意外行为。
func example() {
var rw sync.RWMutex
copyRw := rw
copyRw.Lock() // 错误
}
- 隐蔽的死锁情景:写锁操作等待旧的读锁的释放,旧的读锁等待新的读锁的释放,新的读锁等待写锁的释放,形成死锁。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var rw sync.RWMutex
var count int
write := func() {
rw.Lock()
defer rw.Unlock()
count++
fmt.Println("Write:", count)
}
read := func() {
rw.RLock()
defer rw.RUnlock()
fmt.Println("Read:", count)
}
// 启动多个读操作
for i := 0; i < 5; i++ {
go read()
}
// 启动写操作
go write()
time.Sleep(time.Second)
}
在上述代码中,多个goroutine同时调用read函数,通过RWMutex来确保对共享变量count的读取是安全的。同时,write函数用于更新共享变量count,确保在写操作时独占访问权。
死锁
什么是死锁
死锁指的是一组进程由于相互持有和等待资源,导致无法继续执行的状态。在这种情况下,所有相关的进程都会无限期阻塞,无法向前推进。具体来说,死锁发生在一个进程持有某些资源并等待其他进程释放其占有的资源,同时这些其他进程也在等待第一个进程释放资源,形成相互等待的状态。
死锁的必要条件
死锁的发生需要满足以下四个必要条件。
- 互斥条件:资源同一时间只能被一个进程所拥有。
- 请求和保持条件:一个进程已经拥有某些资源,但在等待其他资源时不释放已持有的资源。
- 不可剥夺条件:进程持有的资源在未使用完毕前,不能被强行剥夺,只能由进程自己释放。
- 循环等待条件:存在一个进程集合中的每个进程都在等待另一个进程所持有的资源,形成一个循环等待链。
如何解决死锁问题
为了解决死锁问题,可以采取以下两种策略。
- 检测和恢复:系统可以定期检测死锁的存在,并采取措施恢复。例如,通过回滚进程的一部分操作或强制剥夺资源。
- 破坏死锁的必要条件是,可以通过设计系统来破坏死锁的四个必要条件之一,例如:
- 破坏互斥条件:尽量使用共享资源来减少互斥性。
- 破坏请求和保持条件:在进程开始时一次性请求所有资源,或者在请求新的资源之前释放已持有的资源。
- 破坏不可剥夺条件:设计成可以强制剥夺资源,如通过优先级调度。
- 破坏循环等待条件:对资源进行排序,并要求进程按序请求资源,避免形成循环等待。
示例代码
以下是一个Go语言中的死锁示例,展示了两个goroutine由于相互等待对方持有的资源而导致的死锁:
package main
import (
"fmt"
"sync"
)
func main() {
var mutexA, mutexB sync.Mutex
go func() {
mutexA.Lock()
fmt.Println("Goroutine 1: Locked mutexA")
// Simulate some work
mutexB.Lock()
fmt.Println("Goroutine 1: Locked mutexB")
mutexB.Unlock()
mutexA.Unlock()
}()
go func() {
mutexB.Lock()
fmt.Println("Goroutine 2: Locked mutexB")
// Simulate some work
mutexA.Lock()
fmt.Println("Goroutine 2: Locked mutexA")
mutexA.Unlock()
mutexB.Unlock()
}()
// Wait for goroutines to finish (they won't due to deadlock)
select {}
}
在上述代码中,两个goroutine分别持有mutexA
和mutexB
,并且尝试获取对方的锁,导致死锁发生。每个goroutine无限期等待对方释放资源,形成相互等待的循环。
通过了解死锁的概念、必要条件及解决策略,我们可以更好地设计并发程序,避免陷入死锁状态。
WaitGroup
基本概念
WaitGroup 是 Go 语言的 sync 包下提供的一种并发原语,用来解决并发编排的问题。它主要用于等待一组 goroutine 完成。假设一个大任务需要等待三个小任务完成才能继续执行,如果采用轮询的方法,可能会导致两个问题:一是小任务已经完成但大任务需要很久才能被轮询到,二是轮询会造成 CPU 资源的浪费。因此,WaitGroup 通过阻塞等待并唤醒大任务的 goroutine 来解决这个问题。
基本用法
WaitGroup 提供了三个方法:Add、Done 和 Wait。
- Add(delta int):将计数器增加 delta 值。
- Done():将计数器的值减一,相当于 Add(-1)。
- Wait():阻塞等待,直到计数器的值变为 0,然后唤醒调用者。
实现原理
WaitGroup 维护了两个计数器,一个是 v 计数器,另一个是 w 计数器。
- 调用 Add 方法时,v 计数器的值会增加相应的 delta 值。
- 调用 Done 方法时,v 计数器的值会减一。
- 调用 Wait 方法时,w 计数器的值会加一。当 v 计数器的值为 0 时,会唤醒所有的 waiter。
易错场景
使用 WaitGroup 需要注意以下易错场景:
- 计数器的值为负数会引发 panic;
- v 计数器增加的值大于减少的值,会造成一直阻塞。
示例代码
以下是一个使用 WaitGroup 的示例代码:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Done() 方法用于减少计数器
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1) // Add() 方法增加计数器
go worker(i, &wg)
}
wg.Wait() // Wait() 方法阻塞等待所有计数器为 0
fmt.Println("All workers done")
}
在上述代码中,main 函数创建了一个 WaitGroup 并启动了三个 goroutine,每个 goroutine 执行 worker 函数。在 worker 函数中,调用 wg.Done() 方法表示当前工作已经完成。main 函数中的 wg.Wait() 方法阻塞等待,直到所有的 goroutine 都完成工作并调用了 Done 方法。
小结
WaitGroup 是 Go 语言中非常有用的并发原语,用于等待一组 goroutine 完成。通过合理使用 Add、Done 和 Wait 方法,可以避免轮询等待带来的性能问题,并提高并发编排的效率。在使用 WaitGroup 时,需要注意计数器的增减操作,避免引发 panic 或长时间阻塞。
Channel
基本概念
Go 语言提倡通过通信来实现共享内存,而不是通过共享内存来通信。Go 的 CSP(Communicating Sequential Processes)并发模型正是通过 Goroutine 和 Channel 来实现的。Channel 是 Go 语言中用于 goroutine 之间通信的主要工具。
应用场景
Channel 有以下几类应用场景。
- 数据交互:通过 Channel 可以模拟并发的 Buffer 或者 Queue,实现生产者-消费者模式。
- 数据传递:通过 Channel 将数据传递给其他的 goroutine 进行处理。
- 信号通知:Channel 可以用于传递一些信号,如 close、data ready 等。
- 并发编排:通过 Channel 的阻塞等待机制,可以让一组 goroutine 按照一定的顺序并发或串行执行。
- 实现锁功能:通过 Channel 的阻塞等待机制,可以实现互斥锁的功能。
基本用法
Channel 有三种类型:
- 只能接收的 Channel:<-chan T。
- 只能发送的 Channel:chan<- T。
- 既能发送又能接收的 Channel:chan T。
Channel 通过 make 函数进行初始化,未初始化的 Channel 的零值是 nil,对 nil 的 Channel 进行接收或发送操作会导致阻塞。
Channel 可以分为有缓冲和无缓冲两种。无缓冲的 Channel 是同步的,有缓冲的 Channel 是异步的。发送操作只有在 Channel 满时才会阻塞,接收操作只有在 Channel 为空时才会阻塞。
发送操作是 chan<-,接收操作是 <-chan。接收数据时可以返回两个值,第一个是元素,第二个是一个布尔值,若为 false 则说明 Channel 已经被关闭并且 Channel 中没有缓存的数据。
Go 的内建函数 close、cap、len 都可以操作 Channel 类型,发送和接收都可以作为 select 语句的 case,Channel 也可以应用于 for range 语句。
实现原理
发送
在发送数据给 Channel 时,发送语句会转化为 chansend 函数:
- 如果 Channel 是 nil,调用者会被阻塞;
- 如果 Channel 已经关闭,发送操作会导致 panic;
- 如果 recvq 字段有 receiver,则将数据交给它,而不需要放入 buffer 中;
- 如果没有 receiver,则将数据放入 buffer 中;
- 如果 buffer 满了,则发送者 goroutine 会加入到 sendq 中阻塞休眠,直到被唤醒。
接收
在接收数据时,接收语句会转化为 chanrecv 函数:
- 如果 Channel 是 nil,调用者会被阻塞;
- 如果 Channel 已经被关闭,并且队列中无缓存元素,则返回 false 和一个对应元素的零值;
- 如果 sendq 中有 sender 并且 buffer 中有数据,则优先从 buffer 中取出,否则从 sendq 中弹出一个 sender,把它的数据复制给 receiver;
- 如果没有 sender,则从 buffer 中正常取一个元素;如果没有元素,则 receiver 会加入到 recvq 中阻塞等待,直到接收到数据或者 Channel 被关闭。
关闭
- 如果 Channel 是 nil,关闭 nil 的 Channel 会导致 panic。
- 如果关闭已经关闭的 Channel 也会导致 panic。
- 否则将 recvq 和 sendq 全部清除并唤醒。
示例代码
以下是一个使用 Channel 的示例代码:
package main
import (
"fmt"
"time"
)
// 生产者:生成数据并发送到 channel
func producer(ch chan<- int, count int) {
for i := 0; i < count; i++ {
ch <- i
fmt.Println("Produced:", i)
time.Sleep(time.Millisecond * 500)
}
close(ch) // 关闭 channel,表示生产结束
}
// 消费者:从 channel 接收数据并处理
func consumer(ch <-chan int) {
for data := range ch {
fmt.Println("Consumed:", data)
time.Sleep(time.Millisecond * 1000)
}
}
func main() {
ch := make(chan int, 5) // 创建一个带缓冲的 channel
go producer(ch, 10) // 启动生产者
consumer(ch) // 启动消费者
}
在上述代码中,main 函数创建了一个带缓冲的 Channel,并启动了一个生产者 goroutine 和一个消费者 goroutine。生产者不断生成数据并发送到 Channel 中,消费者从 Channel 中接收数据并进行处理。生产者完成后关闭 Channel,消费者则在接收到所有数据后结束。