谈谈go中sync.cond的用法和理解
引入
不管从说明还是用法来看,Cond都是sync包中最难理解和使用的功能。golang官方注释中说,“Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.”,即Cond是多goroutine协同工作的工具,有的goroutine需要等待某些条件(waiting),有的goroutine使条件成立(announcing)。然而,Cond的用法比起sync包中其他组件来说非常复杂,仅仅调用Cond.Wait或Cond.Signal是不够的,还需要调用Cond.Locker.Lock和Cond.Locker.Unlock。本文将探讨,为什么要这样使用Cond,以及使用Cond的过程中一些要注意的点。
Cond的工作原理
条件和信号
使用Cond的场景中,需要区分条件和信号,这两者经常被混淆。wait等待的是信号,signal发送的是信号,这两个指令均与条件无关。
条件是对临界区资源的判断,某协程修改临界区后,条件可能成立,发送信号,等待该信号的协程接受到信号,判定条件是否成立,如果成立,执行后续操作,否则继续等待。信号只是一种通知,告知与信号对应的条件可能成立。信号和条件不是一一对应的,一个条件一定要有一个信号,但一个信号可以对应到多个条件。比如协程A等待条件为队列未满,协程B等待条件为队列包含整数256,这两个协程的条件对应一个临界区,可以用一个信号通知多个条件判定。
读写者示例
假设有两个goroutine,一个goroutine向一个大小为2的队列中插入元素(后简称写者),另一个goroutine从前面这个队列中读取元素(后简称读者),读者每1秒从队列中取出一个元素,写者要尽可能快的向队列中插入元素。这个场景可以用channel简单实现,但这里用Cond来实现。
我们先用以下代码来理解这个场景,便于后续逐渐深入理解go语言的实现。下列伪代码中,wait和signal是Cond的指令。
package main import ( "fmt" "sync" "time" ) func main() { wait := sync.WaitGroup{} locker := new(sync.Mutex) cond := sync.NewCond(locker) for i := 0; i < 3; i++ { go func(i int) { defer wait.Done() wait.Add(1) cond.L.Lock()//获取锁 fmt.Println("等待开始...") cond.Wait()//等待通知,阻塞当前goroutine,这里会被阻塞,后面不会被执行,直到发送Signal唤醒 fmt.Println("等待结束...") cond.L.Unlock()// // 记得要释放锁 fmt.Println("Goroutine run. Number:", i) }(i) } // 因为是3个协程,所以我们调用3次sleep 发送Signal信号 time.Sleep(time.Second * 1) // 睡眠1秒,使所有goroutine进入 Wait 阻塞状态 cond.Signal() time.Sleep(time.Second * 1) // 睡眠1秒,使所有goroutine进入 Wait 阻塞状态 cond.Signal() time.Sleep(time.Second * 1) // 睡眠1秒,使所有goroutine进入 Wait 阻塞状态 cond.Signal() wait.Wait() }
输出:
等待开始... 等待开始... 等待开始... 等待结束... Goroutine run. Number: 0 等待结束... Goroutine run. Number: 1 等待结束... Goroutine run. Number: 2
我们分析下源码sync.Cond详细的理解:
结构体
type Cond struct { noCopy noCopy // noCopy可以嵌入到结构中,在第一次使用后不可复制,使用go vet作为检测使用 L Locker // 根据需求初始化不同的锁,如*Mutex 和 *RWMutex notify notifyList // 通知列表,调用Wait()方法的goroutine会被放入list中,每次唤醒,从这里取出 checker copyChecker // 复制检查,检查cond实例是否被复制 }
相关函数
Cond函数
type Cond struct { // noCopy可以嵌入到结构中,在第一次使用后不可复制,使用go vet作为检测使用 noCopy noCopy // L is held while observing or changing the condition, // 可以根据需求初始化成不同的锁 L Locker //// 通知列表,调用Wait()方法的goroutine会被放入list中,每次唤醒,从这里取出 notify notifyList // 复制检查,检查cond实例是否被复制 checker copyChecker }
notifyList函数
type notifyList struct { wait uint32 notify uint32 lock uintptr head unsafe.Pointer tail unsafe.Pointer }
NewCond函数
func NewCond(l Locker) *Cond { return &Cond{L: l} }
Wait函数
func (c *Cond) Wait() { // 检查c是否是被复制的,如果是就panic c.checker.check() // 将当前goroutine加入等待队列 t := runtime_notifyListAdd(&c.notify) // 解锁 c.L.Unlock() // 等待队列中的所有的goroutine执行等待唤醒操作 runtime_notifyListWait(&c.notify, t) c.L.Lock() }
Signal函数
func (c *Cond) Signal() { // 检查c是否是被复制的,如果是就panic c.checker.check() // 通知等待列表中的一个 runtime_notifyListNotifyOne(&c.notify) }
Broadcast函数
func (c *Cond) Broadcast() { // 检查c是否是被复制的,如果是就panic c.checker.check() // 唤醒等待队列中所有的goroutine runtime_notifyListNotifyAll(&c.notify) }
package main import ( "fmt" "sync" "time" ) var locker = new(sync.Mutex) var cond = sync.NewCond(locker) func main() { for i := 0; i < 5; i++ { go func(x int) { cond.L.Lock() //获取锁 defer cond.L.Unlock() //释放锁 cond.Wait() //等待通知,阻塞当前goroutine fmt.Println("Goroutine run. Number:", x) time.Sleep(time.Second * 1) }(i) } time.Sleep(time.Second * 1) fmt.Println("Signal...") cond.Signal() // 下发一个通知给已经获取锁的goroutine time.Sleep(time.Second * 1) cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine time.Sleep(time.Second * 3) cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine fmt.Println("Broadcast...") time.Sleep(time.Second * 60)// time.Sleep(time.Minute) }
输出:
Signal... Goroutine run. Number: 0 Goroutine run. Number: 1 Broadcast... Goroutine run. Number: 3 Goroutine run. Number: 4 Goroutine run. Number: 2
总结
调用Cond.Wait前调用Cond.Locker.Lock的原因是,与Wait一起,建立一个分阶段进入临界区的原子性事务。这是非常有意思的实现,常见的事务,会从头至尾对临界区加锁,
而Cond的存在,使得不退出函数的情况下,缩小临界区加锁时间,对程序并行意义重大。