Go sync.Cond条件变量的学习
1. 前言
1.1 sync.cond 可以用来干什么
Golang 的 sync 包中的 Cond 实现了一种条件变量,可以使用多个 Reader 等待公共资源。
每个 Cond 都会关联一个 Lock ,当修改条件或者调用 Wait 方法,必须加锁,保护 Condition。 有点类似 Java 中的 Wait 和 NotifyAll。
sync.Cond 条件变量是用来协调想要共享资源的那些 goroutine, 当共享资源的状态发生变化时,可以被用来通知被互斥锁阻塞的 gorountine。
1.2 条件变量与互斥锁
我们常常会把条件变量这个同步工具拿来与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。
条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。
你知道怎么使用条件变量吗?所以,我们今天的问题就是:条件变量怎样与互斥锁配合使用?
这道题的典型回答是:条件变量的初始化离不开互斥锁,并且它的方法有的也是基于互斥锁的。
条件变量提供的方法有三个:等待通知(wait)、单发通知(signal)和广播通知(broadcast)。
1、条件变量怎样与互斥锁配合使用?
(1)sync.Cond 基于互斥锁/读写锁,它和互斥锁的区别是什么呢?
- 互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine。
- sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
我们想象一个非常简单的场景:
有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。
这个时候,就需要有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事。
Go 语言在标准库 sync 中内置一个 sync.Cond 用来解决这类问题。
2. 夯实基础
2.1 整体流程
主流程我理解是这样的。流程: 外部函数加锁 -> 判断条件变量->wait内部解锁->阻塞等待信号->wait内部加锁-> 修改条件变量-> 外部解锁-> 触发信号。 第一次加解锁是为了保证读条件变量时它不会被修改, wait解锁是为了条件变量能够被其他线程改变。wait内部再次加锁,是对条件变量的保护,因为外部要修改。
2.2 条件变量的Wait方法
在了解了条件变量的使用方式之后,你可能会有这么几个疑问。
- 为什么先要锁定条件变量基于的互斥锁,才能调用它的Wait方法?
- 为什么要用for语句来包裹调用其Wait方法的表达式,用if语句不行吗?
这些问题我在面试的时候也经常问。你需要对这个Wait方法的内部机制有所了解才能回答上来。
条件变量的Wait方法主要做了四件事。
- 把调用它的 goroutine(也就是当前的 goroutine)加入到当前条件变量的通知队列中。
- 解锁当前的条件变量基于的那个互斥锁。
- 让当前的 goroutine 处于等待状态,等到通知到来时再决定是否唤醒它。此时,这个 goroutine 就会阻塞在调用这个Wait方法的那行代码上。
- 如果通知到来并且决定唤醒这个 goroutine,那么就在唤醒它之后重新锁定当前条件变量基于的互斥锁。自此之后,当前的 goroutine 就会继续执行后面的代码了。
你现在知道我刚刚说的第一个疑问的答案了吗?
因为条件变量的Wait方法在阻塞当前的 goroutine 之前,会解锁它基于的互斥锁,所以在调用该Wait方法之前,我们必须先锁定那个互斥锁,否则在调用这个Wait方法时,就会引发一个不可恢复的 panic。
为什么条件变量的Wait方法要这么做呢?你可以想象一下,如果Wait方法在互斥锁已经锁定的情况下,阻塞了当前的 goroutine,那么又由谁来解锁呢?别的 goroutine 吗?
先不说这违背了互斥锁的重要使用原则,即:成对的锁定和解锁,就算别的 goroutine 可以来解锁,那万一解锁重复了怎么办?由此引发的 panic 可是无法恢复的。
如果当前的 goroutine 无法解锁,别的 goroutine 也都不来解锁,那么又由谁来进入临界区,并改变共享资源的状态呢?只要共享资源的状态不变,即使当前的 goroutine 因收到通知而被唤醒,也依然会再次执行这个Wait方法,并再次被阻塞。
所以说,如果条件变量的Wait方法不先解锁互斥锁的话,那么就只会造成两种后果:不是当前的程序因 panic 而崩溃,就是相关的 goroutine 全面阻塞。
再解释第二个疑问。很显然,if语句只会对共享资源的状态检查一次,而for语句却可以做多次检查,直到这个状态改变为止。那为什么要做多次检查呢?
这主要是为了保险起见。如果一个 goroutine 因收到通知而被唤醒,但却发现共享资源的状态,依然不符合它的要求,那么就应该再次调用条件变量的Wait方法,并继续等待下次通知的到来。
2.3 条件变量的Signal方法和Broadcast方法
条件变量的Signal方法和Broadcast方法都是被用来发送通知的,不同的是,前者的通知只会唤醒一个因此而等待的 goroutine,而后者的通知却会唤醒所有为此等待的 goroutine。
条件变量的Wait方法总会把当前的 goroutine 添加到通知队列的队尾,而它的Signal方法总会从通知队列的队首开始,查找可被唤醒的 goroutine。所以,因Signal方法的通知,而被唤醒的 goroutine 一般都是最早等待的那一个。
这两个方法的行为决定了它们的适用场景。如果你确定只有一个 goroutine 在等待通知,或者只需唤醒任意一个 goroutine 就可以满足要求,那么使用条件变量的Signal方法就好了。
否则,使用Broadcast方法总没错,只要你设置好各个 goroutine 所期望的共享资源状态就可以了。
此外,再次强调一下,与Wait方法不同,条件变量的Signal方法和Broadcast方法并不需要在互斥锁的保护下执行。恰恰相反,我们最好在解锁条件变量基于的那个互斥锁之后,再去调用它的这两个方法。这更有利于程序的运行效率。
最后,请注意,条件变量的通知具有即时性。也就是说,如果发送通知的时候没有 goroutine 为此等待,那么该通知就会被直接丢弃。在这之后才开始等待的 goroutine 只可能被后面的通知唤醒。
3. 应用实践
一句话总结:sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。
4. 源码学习
4.1 sync.Cond 的四个方法
sync.Cond 内部维护了一个等待队列,队列中存放的是所有在等待这个 sync.Cond 的 Go 程,即保存了一个通知列表。sync.Cond 可以用来唤醒一个或所有因等待条件变量而阻塞的 Go 程,以此来实现多个 Go 程间的同步。
sync.Cond 的定义如下:
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {noCopy noCopy// L is held while observing or changing the conditionL Lockernotify notifyListchecker copyChecker
}
每个 Cond 实例都会关联一个锁 L(互斥锁 *Mutex,或读写锁 *RWMutex),当修改条件或者调用 Wait 方法时,必须加锁。
和 sync.Cond 相关的有如下几个方法:
1. NewCond 创建实例
NewCond 创建 Cond 实例时,需要关联一个锁。
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {return &Cond{L: l}
}
2. Broadcast 广播唤醒所有
Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护。
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {c.checker.check()runtime_notifyListNotifyAll(&c.notify)
}
3. Signal 唤醒一个协程
Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {c.checker.check()runtime_notifyListNotifyOne(&c.notify)
}
4. Wait 等待
调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {c.checker.check()t := runtime_notifyListAdd(&c.notify)c.L.Unlock()runtime_notifyListWait(&c.notify, t)c.L.Lock()
}
对条件的检查,使用了 for !condition() 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起见,使用 for 能够确保条件符合要求后,再执行后续的代码。
c.L.Lock()
for !condition() {c.Wait()
}
... make use of condition ...
c.L.Unlock()
4.2 使用实例
1. 3个wait1个Broadcast()
接下来我们实现一个简单的例子,三个协程调用 Wait() 等待,另一个协程调用 Broadcast() 唤醒所有等待的协程。
var done = falsefunc read(name string, c *sync.Cond) {c.L.Lock()for !done {c.Wait()}log.Println(name, "starts reading")c.L.Unlock()
}func write(name string, c *sync.Cond) {log.Println(name, "starts writing")time.Sleep(time.Second)c.L.Lock()done = truec.L.Unlock()log.Println(name, "wakes all")c.Broadcast()
}func main() {cond := sync.NewCond(&sync.Mutex{})go read("reader1", cond)go read("reader2", cond)go read("reader3", cond)write("writer", cond)time.Sleep(time.Second * 3)
}
- done 即互斥锁需要保护的条件变量。
- read() 调用 Wait() 等待通知,直到 done 为 true。
- write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。
- write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。
运行结果如下:
$ go run main.go
2021/01/14 23:18:20 writer starts writing
2021/01/14 23:18:21 writer wakes all
2021/01/14 23:18:21 reader2 starts reading
2021/01/14 23:18:21 reader3 starts reading
2021/01/14 23:18:21 reader1 starts reading
writer 接收数据花费了 1s,同步通知所有等待的协程。
5. 总结
我们今天主要讲了条件变量,它是基于互斥锁的一种同步工具。在 Go 语言中,我们需要用sync.NewCond函数来初始化一个sync.Cond类型的条件变量。
sync.NewCond函数需要一个sync.Locker类型的参数值。sync.Mutex类型的值以及sync.RWMutex类型的值都可以满足这个要求。另外,后者的RLocker方法可以返回这个值中的读锁,也同样可以作为sync.NewCond函数的参数值,如此就可以生成与读写锁中的读锁对应的条件变量了。
条件变量的Wait方法需要在它基于的互斥锁保护下执行,否则就会引发不可恢复的 panic。此外,我们最好使用for语句来检查共享资源的状态,并包裹对条件变量的Wait方法的调用。
不要用if语句,因为它不能重复地执行“检查状态 - 等待通知 - 被唤醒”的这个流程。重复执行这个流程的原因是,一个“因为等待通知,而被阻塞”的 goroutine,可能会在共享资源的状态不满足其要求的情况下被唤醒。
条件变量的Signal方法只会唤醒一个因等待通知而被阻塞的 goroutine,而它的Broadcast方法却可以唤醒所有为此而等待的 goroutine。后者比前者的适应场景要多得多。
这两个方法并不需要受到互斥锁的保护,我们也最好不要在解锁互斥锁之前调用它们。还有,条件变量的通知具有即时性。当通知被发送的时候,如果没有任何 goroutine 需要被唤醒,那么该通知就会立即失效。
再强调一下。对于同一个锁,多个 goroutine 对它重复锁定时只会有一个成功,其余的会阻塞;多个 goroutine 对它重复解锁时也只会有一个成功,但其余的会抛 panic。
相关资料
- go sync.cond 源码
- https://geektutu.com/ go sync.cond
- 关于 sync.Cond 的讨论可参考 How to correctly use sync.Cond? - StackOverflow
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
