goroutine 的调度
操作系统线程、逻辑处理器和 goroutine
goroutine 是 Go 赖以支持并发的重要特性,Go 的运行时在一组逻辑处理器上调度 goroutine,每一个逻辑处理器都对应一个操作系统线程。
我们通过几种常见的情况来看看在 Go 的调度下 goroutine、逻辑处理器和操作系统线程的行为。
最普通的情况,当我们创建一个 goroutine 并准备运行它,这个 goroutine 会进入调度器的全局执行队列中,调度器会将队列中的 goroutine 分配给逻辑处理器来运行,goroutine 此时进入到逻辑处理器的本地执行队列中等待处理。下图说明了这种情况。
更复杂一些,有时 goroutine 会被系统调用阻塞,这时,逻辑处理器会与该 goroutine 及执行该 goroutine 的线程分离,如下图所示。此时,这个逻辑处理器会新建一个线程,并从本地执行队列中选取另一个 goroutine 来执行。当先前阻塞的 goroutine 准备好时,它会回到本地执行队列,多余的线程会保存好备用。
另一种常见的情况是 goroutine 中需要进行网络调用,此时 goroutine 与逻辑处理器也会分离,goroutine 会被到带网络轮询器的运行时上,直到网络调用完成之后,重新分配给逻辑处理器。
goroutine 的暂停与重新调度
一个 goroutine 在运行完毕之前,调度器可以暂停它的运行并随后重新调度它。我们通过下面这段程序来观察一下这种切换行为。
我们通过 runtime.GOMAXPROCS(1)
来限定只使用一个逻辑调度器,并在这个逻辑调度器上创建两个 goroutine,每一个 goroutine 中会执行一些比较耗时间的操作。
在 Go Playground 运行这段代码:https://go.dev/play/p/T3Clh4e-cH-
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
// 限定只使用一个逻辑调度器
runtime.GOMAXPROCS(1)
wg.Add(2)
go longTimeFunc("A")
go longTimeFunc("B")
wg.Wait()
}
func longTimeFunc(name string){
defer wg.Done()
for i := 0; i < 100; i++ {
time.Sleep(time.Duration(500)*time.Millisecond)
fmt.Println(name)
}
}
运行结果:
B
A <- 切换 goroutine
A
B <- 切换 goroutine
B
...(省略 195 行)
在这个执行结果中可以看到,在我们设置的唯一逻辑处理器上,两个 goroutine 切换运行。当然,每一次的执行结果会有所变化。
并行任务
上面的例子中只使用了一个逻辑处理器,接下来,我们来观察使用多个逻辑处理器来并行地来处理任务。
这里再来明晰一下并发与并行的概念。直观上,并发(concurrency)和并行(parallelism)都是在说“同时处理多件事情”,但它们并不是相同的概念:并行强调的是同时做多件事情,某一时刻,多个事情同时在发生,而并发强调同时管理多件事情,这些事情可能是轮流做的,某一时刻,可能只有一件事情在发生。
打一个比方,并发是诊室里只有一个医生,他一会儿为病人 A 检查,一会儿为病人 B 开药,一会儿又回来看 A 的检查结果。而并行是诊室里有多个医生,每个医生同时在工作,各自负责一些病人。
对应到我们在谈论的 Go 上,我们的第一个例子中,多个 goroutine 是并发地在一个逻辑处理器上执行的。Go 1.5 之后,Go 运行时默认为每一个可用的物理处理器分配一个逻辑处理器,多个逻辑处理器会并行地执行任务。
多个逻辑处理器不一定会带来更好的性能,反之,单个逻辑处理器也不意味着低效率——Go 1.5 之前的版本上,一个 Go 程序只会分配一个逻辑处理器。
下面这段程序中,我们准备了两个 goroutine,它们分别会打印 100 个 A 和 100 个 B。首先我们还是像之前一样限定使用一个逻辑调度器。
在 Go Playground 运行这段代码:https://go.dev/play/p/QoqcDsEX4t7
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
// 限定只使用一个逻辑调度器
runtime.GOMAXPROCS(1)
var wg sync.WaitGroup
wg.Add(2)
// goroutine A
go func() {
defer wg.Done()
for i := 1; i <= 100; i++ {
fmt.Printf("A ")
}
}()
// goroutine B
go func() {
defer wg.Done()
for i := 1; i <= 100; i++ {
fmt.Printf("B ")
}
}()
wg.Wait()
}
运行结果:
B B B B B …(省略 90 个 B)… B B B B B A A A A A …(省略 90 个 A)… A A A A A
根据上面的运行结果可以分析到,在我们设置的唯一的逻辑调度器上,只发生了一次 goroutine 切换,因此呈现的是 100 个 B 后面跟着 100 个 A。这可以认为是因为打印这些字符需要的时间很短,所以没等待来回切换,两个 goroutine 就运行完了。
现在我们把逻辑处理器的数量调整到 2,即 runtime.GOMAXPROCS(2)
,其他代码没有变动,这里不再重复。
在 Go Playground 运行这段代码:https://go.dev/play/p/ZLnnjS3kMNr
运行结果:
B B B B A A A A A B B B B B B B A A A ...(后续字符省略)
可以看到,A 和 B 是混合着打印出来的,这是因为两个 goroutine 是分别在两个逻辑处理器上并行运行的(这个运行结果是在多核心的 CPU 上得到的,即两个逻辑处理器分别在两个物理处理器上)。当然,每次运行的结果会有所变化,不同的环境也会对运行结果造成影响。
共享资源与竞态
竞态
程序中总存在一些需要被多个 goroutine 同时访问并操作的共享资源,他可能是一个变量,可能是一个文件。如果不加同步控制去访问、操作这些资源,就会产生竞态。竞态会导致许多潜在的问题,我们通过下面这段程序制造竞态,来观察并分析问题是如何产生的。
这段程序中启动了两个 goroutine,分别对 shared
做 100000 次 +1 的操作。
在 Go Playground 运行这段代码:https://go.dev/play/p/AuauJtSkykF
package main
import (
"fmt"
"sync"
)
var shared int
var wg sync.WaitGroup
func main() {
wg.Add(2)
go inc("A")
go inc("B")
wg.Wait()
fmt.Printf("%d\n", shared)
}
func inc(name string) {
defer wg.Done()
for i := 1; i <= 100000; i++ {
shared = shared + 1
}
}
运行结果:
107197
每次运行的结果会有所不同,最终的结果大概率不是 200000。这是因为 shared = shared + 1
不是一个原子操作,如果一个 goroutine 在一次自增未完全完成的时候被切换,那么另一个 goroutine 就会在原值上继续操作。也就是说,有一些自增操作被“覆盖”了。
Go 1.1 之后加入了竞态检测器,可以在测试程序(go test
)、编译并运行程序(go run
)、构建(go build
)、安装包(go install
)时加上 -race
标志,竞态检测器会监视内存访问、检测对共享变量的非同步访问,如果检测到竞争行为,会给出警告。
例如,对上面的程序使用 -race
标志:
go run -race main.go
得到下面的结果
==================
WARNING: DATA RACE
Read at 0x0000009c1e90 by goroutine 8:
main.inc()
/foo/main.go:24 +0x84
Previous write at 0x0000009c1e90 by goroutine 7:
main.inc()
/foo/main.go:24 +0xa4
Goroutine 8 (running) created at:
main.main()
/foo/main.go:15 +0xa8
Goroutine 7 (running) created at:
main.main()
/foo/main.go:14 +0x7b
==================
200000
Found 1 data race(s)
exit status 66
Go 的竞态检测器为我们返回了引发竞态的 goroutine 和对应代码所在的位置。
更多竞态检测器的信息可以参考 Go 官方博客 Introducing the Go Race Detector。
既然竞态会导致各种潜在的问题,我们就要着手来应对它。Go 支持传统的同步机制——加锁,也有一种具有 Go 特色的同步机制——通道,我们先来看前者,它又可以分为两种形式——原子函数和互斥锁。
原子函数和互斥锁
原子函数
Go 在 atomic
中提供了一些原子函数,它们通过很底层的锁机制获得原子能力。例如,可以用原子函数 AddInt64
来改写上面的代码,如下所示。
在 Go Playground 运行这段代码:https://go.dev/play/p/RzHIWBVQjda
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var shared int64
var wg sync.WaitGroup
func main() {
wg.Add(2)
go inc("A")
go inc("B")
wg.Wait()
fmt.Printf("%d\n", shared)
}
func inc(name string) {
defer wg.Done()
for i := 1; i <= 100000; i++ {
atomic.AddInt64(&shared, 1)
}
}
运行结果:
200000
我们用 atomic.AddInt64(&shared, 1)
替换了原来的 shared = shared + 1
,这个原子函数会保证同一时刻只有一个 goroutine 对 shared
进行并完成加法操作。
atomic
包提供原子交换(Swap*
)、原子 CAS(CompareAndSwap*
)、原子加(Add*
)、原子取(Load*
)、原子存(Store*
)等一系列原子能力。
互斥锁
Go 在 sync
包中提供了 Mutex
类型,即互斥锁。互斥锁可以是一段代码成为临界区,同一时刻只允许一个 goroutine 进入。
我们用互斥锁来改写之前的代码,如下所示:
在 Go Playground 运行这段代码:https://go.dev/play/p/JFuMXZ1VhoY
package main
import (
"fmt"
"sync"
)
var shared int
var wg sync.WaitGroup
var mutex sync.Mutex
func main() {
wg.Add(2)
go inc("A")
go inc("B")
wg.Wait()
fmt.Printf("%d\n", shared)
}
func inc(name string) {
defer wg.Done()
for i := 1; i <= 100000; i++ {
mutex.Lock()
shared = shared + 1 // 临界区
mutex.Unlock()
}
}
运行结果:
200000
在 shared = shared + 1
操作之前,我们对 mutex
上锁,在完成自增操作后解锁,这样就是使 shared = shared + 1
这一行成为了临界区,只有一个 goroutine 能够进入,也就避免了竞态带来的问题。
原子函数和互斥锁是传统且常见的同步机制,除此之外,Go 还提供另一种同步机制——通道。
通道
无缓冲通道
通道机制基于通信顺序进程(Communicating Sequential Process, CSP)这种消息传递模型。
正如“通道”这个名字所言,它是用来传递东西的。再来回顾一下,我们面临的问题是:有多个 goroutine 都要使用一个共享资源,但同时只能有一个 goroutine 使用这个资源。来看看通道机制如何做到这件事情。
让我们想象,现在有两个小人 Alice 和 Bob,分别站在两面悬崖上,它们都需要有一份共享的物品,现在由 Alice 持有,Bob 无法使用,Alice 使用完毕之后,要想办法给 Bob。
他们想到了一个好办法,准备一条绳子,Alice 和 Bob 各牵着绳子的一头,把这份物品从绳子上“滑”过去。
以此类比,Alice 和 Bob 就是两个 goroutine,他们之间用来从传递物品的绳子就是通道,更精确一些——这是一个无缓冲通道。也就是说,通道本身没有能力存储值,这就要求发送方和接受方同时准备好。就好像 Alice 和 Bob 的绳子一样,如果其中有一头没人牵着,那就没法传递物品,像下面这样:
如果发送、接收的两个 goroutine 没有都准备好,无缓冲通道就会导致其中一方阻塞等待,直到双方同时准备好为止,所以无缓冲通道的发送和接受动作本身就是同步的。下面我们用 Go 代码来模拟一下 Alice 和 Bob 来回传递物品的动作。
在 Go Playground 运行这段代码:https://go.dev/play/p/XxIx89bG1s8
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
rand.Seed(time.Now().Unix())
wg.Add(2)
// 创建 int 类型的无缓冲通道
rope := make(chan int)
go transfer("Alice", rope)
go transfer("Bob", rope)
rope <- 0
wg.Wait()
}
func transfer(name string, rope chan int) {
defer wg.Done()
for {
// 从绳子上拿取物品
times, ok := <-rope
if !ok {
fmt.Printf("%s 结束接收、n", name)
return
}
// 以一定概率结束传送
if n := rand.Intn(100); n > 80 {
fmt.Printf("%s 收到物品并结束发送,物品共被传送 %d 次、n", name, times)
close(rope)
return
}
times++
fmt.Printf("%s 收到物品,并开始物品的第 %d 次传送、n", name, times)
rope <- times
}
}
在 Go 中,需要用 make
来声明通道,声明通道时需要指定通道中允许传输的数据类型。上面的代码中,使用 rope := make(chan int)
创建了一个允许共享 int 类型数据的无缓冲通道。
随后,为 Alice 和 Bob 各启动了一个 goroutine,执行 transfer
操作。在 transfer
函数中,我们使用 <-rope
符号从通道中接收值,使用 rope <-
向通道中发送值。<-
符号和通道名称的位置与数据的流向是一致的。
上面这段程序的运行结果如下:
Bob 收到物品,并开始物品的第 1 次传送
Alice 收到物品,并开始物品的第 2 次传送
Bob 收到物品,并开始物品的第 3 次传送
Alice 收到物品,并开始物品的第 4 次传送
Bob 收到物品,并开始物品的第 5 次传送
Alice 收到物品并结束发送,物品共被传送 5 次
Bob 结束接收
这个结果符合我们的预期。
有缓冲通道
除了无缓冲通道,Go 还提供有缓冲通道,有缓冲通道在被接收前能存储一个或多个值。
还是用悬崖上的 Alice 和 Bob 来举例,他们之间现在建起了一条传送带!这意味着在一方准备好接收之前,共享资源可以存放在传送带上,发送者可以去干别的事情。
前面的无缓冲通道会在收发两方任一一方没准备好的时候阻塞,那么有缓冲通道的阻塞条件是什么呢?举个例子来看,这个例子中我们让 Alice 来做发送者,Bob 做接收者,传送带可以容纳三件物品放在上面。
我们用这个例子模拟了一个容量为 3 的有缓冲通道,它在没有空间继续容纳新的值时阻塞发送动作,在通道中没有值可以获取时阻塞接收动作。
我们用通道来改写一下之前多个 goroutine 各自自增 100000 次的程序。
在 Go Playground 运行这段代码:https://go.dev/play/p/A9J8Q2S-E7P
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
wg.Add(2)
shared := make(chan int, 1)
shared <- 0
go inc("A", shared)
go inc("B", shared)
wg.Wait()
close(shared)
fmt.Printf("%d\n", <-shared)
}
func inc(name string, shared chan int) {
defer wg.Done()
for i := 1; i <= 100000; i++ {
cur := <-shared
cur++
shared <- cur
}
}
运行结果:
200000
在这段代码中使用了一个带 1 个容量的有缓冲通道。有缓冲通道的声明是在 make
的时候带上容量值,例如 shared := make(chan int, 1)
。值得关注的是在第 21 行和 22 行,通道先被关闭了,随后我们又从中读出了值。这是因为,通道被关闭之后,仍可以从中接收数据,但不可以继续发送数据。
2022年1月1日 at 下午5:16
很久没见过这种好文,博主tql