Go语言面试题精讲

Olivia的小跟班 Lv3

题记

Golang面试前用于突击的八股文。

Go基础

Go 程序的基本结构?

img

Go 有哪些关键字?

img

Go 有哪些数据类型?

img

Go 方法与函数的区别?

image-20230828192956848

Go 方法值接收者和指针接收者的区别?

如果方法的接收者是指针类型,无论调用者是对象还是对象指针,修改的都是对象本身,会影响调用者;

如果方法的接收者是值类型,无论调用者是对象还是对象指针,修改的都是对象的副本,不影响调用者;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import "fmt"

type Person struct {
age int
}

// 如果实现了接收者是指针类型的方法,会隐含地也实现了接收者是值类型的IncrAge1方法。
// 会修改age的值
func (p *Person) IncrAge1() {
p.age += 1
}

// 如果实现了接收者是值类型的方法,会隐含地也实现了接收者是指针类型的IncrAge2方法。
// 不会修改age的值
func (p Person) IncrAge2() {
p.age += 1
}

// 如果实现了接收者是值类型的方法,会隐含地也实现了接收者是指针类型的GetAge方法。
func (p Person) GetAge() int {
return p.age
}

func main() {
// p1 是值类型
p := Person{age: 10}

// 值类型 调用接收者是指针类型的方法
p.IncrAge1()
fmt.Println(p.GetAge())
// 值类型 调用接收者是值类型的方法
p.IncrAge2()
fmt.Println(p.GetAge())

// ----------------------

// p2 是指针类型
p2 := &Person{age: 20}

// 指针类型 调用接收者是指针类型的方法
p2.IncrAge1()
fmt.Println(p2.GetAge())
// 指针类型 调用接收者是值类型的方法
p2.IncrAge2()
fmt.Println(p2.GetAge())
}
/*
11
11
21
21
*/

上述代码中:

实现了接收者是指针类型的 IncrAge1 函数,不管调用者是值类型还是指针类型,都可以调用IncrAge1方法,并且它的 age 值都改变了。

实现了接收者是指针类型的 IncrAge2 函数,不管调用者是值类型还是指针类型,都可以调用IncrAge2方法,并且它的 age 值都没有被改变。

通常我们使用指针类型作为方法的接收者的理由

  • 使用指针类型能够修改调用者的值。
  • 使用指针类型可以避免在每次调用方法时复制该值,在值的类型为大型结构体时,这样做会更加高效。

Go 函数返回局部变量的指针是否安全?

一般来说,局部变量会在函数返回后被销毁,因此被返回的引用就成为了”无所指”的引用,程序会进入未知状态。

但这在 Go 中是安全的,Go 编译器将会对每个局部变量进行逃逸分析。如果发现局部变量的作用域超出该函数,则不会将内存分配在栈上,而是分配在堆上,因为他们不在栈区,即使释放函数,其内容也不会受影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func add(x, y int) *int {
res := 0
res = x + y
return &res
}

func main() {
fmt.Println(add(1, 2))
}

这个例子中,函数 add 局部变量 res 发生了逃逸。res作为返回值,在 main 函数中继续使用,因此 res 指向的内存不能够分配在栈上,随着函数结束而回收,只能分配在堆上。

编译时可以借助选项 -gcflags=-m,查看变量逃逸的情况

1
2
3
4
5
6
7
./main.go:6:2: res escapes to heap:
./main.go:6:2:   flow: ~r2 = &res:
./main.go:6:2:     from &res (address-of) at ./main.go:8:9
./main.go:6:2:     from return &res (return) at ./main.go:8:2
./main.go:6:2: moved to heap: res
./main.go:12:13: ... argument does not escape
0xc0000ae008

res escapes to heap 即表示 res 逃逸到堆上了。

Go 函数参数传递到底是值传递还是引用传递?

先说下结论:

Go语言中所有的传参都是值传递(传值),都是一个副本,一个拷贝。

参数如果是非引用类型(int、string、struct等这些),这样就在函数中就无法修改原内容数据;如果是引用类型(指针、map、slice、chan等这些),这样就可以修改原内容数据。

是否可以修改原内容数据,和传值、传引用没有必然的关系。在C++中,传引用肯定是可以修改原内容数据的,在Go语言里,虽然只有传值,但是我们也可以修改原内容数据,因为参数是引用类型

引用类型和引用传递是2个概念,切记!!!

什么是值传递?

将实参的值传递给形参,形参是实参的一份拷贝,实参和形参的内存地址不同。函数内对形参值内容的修改,是否会影响实参的值内容,取决于参数是否是引用类型

什么是引用传递?

将实参的地址传递给形参,函数内对形参值内容的修改,将会影响实参的值内容。Go语言是没有引用传递的,在C++中,函数参数的传递方式有引用传递。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "fmt"

func main() {
m := make(map[string]int)
m["age"] = 8

fmt.Printf("原始map的内存地址是:%p\n", &m)
modifyMap(m)
fmt.Printf("改动后的值是: %v\n", m)
}

func modifyMap(m map[string]int) {
fmt.Printf("函数里接收到map的内存地址是:%p\n", &m)
m["age"] = 9
}
/*
原始map的内存地址是:0xc00000e028
函数里接收到map的内存地址是:0xc00000e038
改动后的值是: map[age:9]
通过make函数创建的map变量本质是一个hmap类型的指针*hmap,所以函数内对形参的修改,会修改原内容数据(channel也如此)
*/

Go defer关键字的实现原理?

定义

defer 能够让我们推迟执行某些函数调用,推迟到当前函数返回前才实际执行。defer与panic和recover结合,形成了Go语言风格的异常与捕获机制。

使用场景

defer 语句经常被用于处理成对的操作,如文件句柄关闭、连接关闭、释放锁

优点:

方便开发者使用

缺点:

有性能损耗

实现原理

Go1.14中编译器会将defer函数直接插入到函数的尾部,无需链表和栈上参数拷贝,性能大幅提升。把defer函数在当前函数内展开并直接调用,这种方式被称为open coded defer

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func A(i int) {
defer A1(i, 2*i)
if(i > 1) {
defer A2("Hello", "eggo")
}
// code to do something
return
}
func A1(a,b int) {
//......
}
func A2(m,n string) {
//......
}

编译后(伪代码):

1
2
3
4
5
6
7
8
func A(i int) {
// code to do something
if(i > 1){
A2("Hello", "eggo")
}
A1(i, 2*i)
return
}

代码示例

1、函数退出前,按照先进后出的顺序,执行defer函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import "fmt"

// defer:延迟函数执行,先进后出
func main() {
defer fmt.Println("defer1")
defer fmt.Println("defer2")
defer fmt.Println("defer3")
defer fmt.Println("defer4")
fmt.Println("11111")
}

// 11111
// defer4
// defer3
// defer2
// defer1

2、panic后的defer函数不会被执行(遇到panic,如果没有捕获错误,函数会立刻终止)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import "fmt"

// panic后的defer函数不会被执行
func main() {
defer fmt.Println("panic before")
panic("发生panic")
defer func() {
fmt.Println("panic after")
}()
}

// panic before
// panic: 发生panic

3、panic没有被recover时,抛出的panic到当前goroutine最上层函数时,最上层程序直接异常终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "fmt"

func F() {
defer func() {
fmt.Println("b")
}()
panic("a")
}

// 子函数抛出的panic没有recover时,上层函数时,程序直接异常终止
func main() {
defer func() {
fmt.Println("c")
}()
F()
fmt.Println("继续执行")
}

// b
// c
// panic: a

4、panic有被recover时,当前goroutine最上层函数正常执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import "fmt"

func F() {
defer func() {
if err := recover(); err != nil {
fmt.Println("捕获异常:", err)
}
fmt.Println("b")
}()
panic("a")
}

func main() {
defer func() {
fmt.Println("c")
}()
F()
fmt.Println("继续执行")
}

// 捕获异常: a
// b
// 继续执行
// c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import "fmt"

func main() {

defer func() {
if v := recover();v == 11 {
fmt.Println("v:",v)
}
fmt.Printf("defer1...")
}()

defer func() {
fmt.Printf("defer2...")
}()

array := [2]int{1,2}
fmt.Println("array: ",array[1])
panic(11)
}
//array: 2
//defer2...
//v: 11
//defer1...

5.执行过程是: 保存返回值(若有)–>执行defer(若有)–>执行ret跳转

1
2
3
4
5
6
7
func foo() (ret int) {
defer func() {
ret++
}()

return 0
}

6.延迟函数的参数在defer语句出现时就已经确定下来了

1
2
3
4
5
6
func a() {
i := 0
defer fmt.Println(i)
i++
return
}

注意:

执行顺序应该为panic、defer、recover

  • 发生panic的函数并不会立刻返回,而是先层层函数执行defer,再返回。如果有办法将panic捕获到panic,就正常处理(若是外部函数捕获到,则外部函数只执行defer),如果没有没有捕获,程序直接异常终止。
  • Go语言提供了recover内置函数。前面提到,一旦panic逻辑就会走到defer(defer必须在panic的前面!)。调用recover函数将会捕获到当前的panic,被捕获到的panic就不会向上传递了
  • 在panic发生时,在前面的defer中通过recover捕获这个panic,转化为错误通过返回值告诉方法调用者。

Go 内置函数make和new的区别?

首先纠正下make和new是内置函数,不是关键字

变量初始化,一般包括2步,变量声明 + 变量内存分配,var关键字就是用来声明变量的,new和make函数主要是用来分配内存的

var声明值类型的变量时,系统会默认为他分配内存空间,并赋该类型的零值

比如布尔、数字、字符串、结构体

如果指针类型或者引用类型的变量,系统不会为它分配内存,默认就是nil。此时如果你想直接使用,那么系统会抛异常,必须进行内存分配后,才能使用。

new 和 make 两个内置函数,主要用来分配内存空间,有了内存,变量就能使用了,主要有以下2点区别:

使用场景区别:

make 只能用来分配及初始化类型为slice、map、chan 的数据。

new 可以分配任意类型的数据,并且置零。

返回值区别:

make函数原型如下,返回的是slice、map、chan类型本身

这3种类型是引用类型,就没有必要返回他们的指针

1
func make(t Type, size ...IntegerType) Type

new函数原型如下,返回一个指向该类型内存地址的指针

1
func new(Type) *Type

Slice

Go slice的底层实现原理?

切片是基于数组实现的,它的底层是数组,可以理解为对 底层数组的抽象。

源码包中src/runtime/slice.go 定义了slice的数据结构:

1
2
3
4
5
type slice struct {
array unsafe.Pointer
len int
cap int
}

slice占用24个字节

array: 指向底层数组的指针,占用8个字节

len: 切片的长度,占用8个字节

cap: 切片的容量,cap 总是大于等于 len 的,占用8个字节

slice有4种初始化方式

1
2
3
4
5
6
7
8
9
10
11
// 初始化方式1:直接声明
var slice1 []int

// 初始化方式2:使用字面量
slice2 := []int{1, 2, 3, 4}

// 初始化方式3:使用make创建slice
slice3 := make([]int, 3, 5)

// 初始化方式4: 从切片或数组“截取”
slcie4 := arr[1:3]

通过一个简单程序,看下slice初始化调用的底层函数

1
2
3
4
5
6
7
8
9
package main

import "fmt"

func main() {
slice := make([]int, 0)
slice = append(slice, 1)
fmt.Println(slice, len(slice), cap(slice))
}

通过 go tool compile -S test.go | grep CALL 得到汇编代码

1
2
3
4
5
6
7
8
9
0x0042 00066 (test.go:6)        CALL    runtime.makeslice(SB)
0x006d 00109 (test.go:7) CALL runtime.growslice(SB)
0x00a4 00164 (test.go:8) CALL runtime.convTslice(SB)
0x00c0 00192 (test.go:8) CALL runtime.convT64(SB)
0x00d8 00216 (test.go:8) CALL runtime.convT64(SB)
0x0166 00358 ($GOROOT/src/fmt/print.go:274) CALL fmt.Fprintln(SB)
0x0180 00384 (test.go:5) CALL runtime.morestack_noctxt(SB)
0x0079 00121 (<autogenerated>:1) CALL runtime.efaceeq(SB)
0x00a0 00160 (<autogenerated>:1) CALL runtime.morestack_noctxt(SB)

初始化slice调用的是runtime.makeslice,makeslice函数的工作主要就是计算slice所需内存大小,然后调用mallocgc进行内存的分配

所需内存大小 = 切片中元素大小 * 切片的容量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func makeslice(et *_type, len, cap int) unsafe.Pointer {
mem, overflow := math.MulUintptr(et.size, uintptr(cap))
if overflow || mem > maxAlloc || len < 0 || len > cap {
// NOTE: Produce a 'len out of range' error instead of a
// 'cap out of range' error when someone does make([]T, bignumber).
// 'cap out of range' is true too, but since the cap is only being
// supplied implicitly, saying len is clearer.
// See golang.org/issue/4085.
mem, overflow := math.MulUintptr(et.size, uintptr(len))
if overflow || mem > maxAlloc || len < 0 {
panicmakeslicelen()
}
panicmakeslicecap()
}

return mallocgc(mem, et, true)
}

Go array和slice的区别?

1)数组长度不同

数组初始化必须指定长度,并且长度就是固定的

切片的长度是不固定的,可以追加元素,在追加时可能使切片的容量增大

2)函数传参不同

数组是值类型,将一个数组赋值给另一个数组时,传递的是一份深拷贝,函数传参操作都会复制整个数组数据,会占用额外的内存,函数内对数组元素值的修改,不会修改原数组内容。

切片是引用类型,将一个切片赋值给另一个切片时,传递的是一份浅拷贝,函数传参操作不会拷贝整个切片,只会复制len和cap,底层共用同一个数组,不会占用额外的内存,函数内对数组元素值的修改,会修改原数组内容。

3)计算数组长度方式不同

数组需要遍历计算数组长度,时间复杂度为O(n)

切片底层包含len字段,可以通过len()计算切片长度,时间复杂度为O(1)

Go slice深拷贝和浅拷贝

深拷贝:拷贝的是数据本身,创造一个新对象,新创建的对象与原对象不共享内存,新创建的对象在内存中开辟一个新的内存地址,新对象值修改时不会影响原对象值

实现深拷贝的方式:

  1. copy(slice2, slice1)
  2. 遍历append赋值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
slice1 := []int{1, 2, 3, 4, 5}
slice2 := make([]int, 5, 5)
fmt.Printf("slice1: %v, %p\n", slice1, slice1)
copy(slice2, slice1)
fmt.Printf("slice2: %v, %p\n", slice2, slice2)
slice3 := make([]int, 0, 5)
for _, v := range slice1 {
slice3 = append(slice3, v)
}
fmt.Printf("slice3: %v, %p\n", slice3, slice3)
}

slice1: [1 2 3 4 5], 0xc0000b0030
slice2: [1 2 3 4 5], 0xc0000b0060
slice3: [1 2 3 4 5], 0xc0000b0090

浅拷贝:拷贝的是数据地址,只复制指向的对象的指针,此时新对象和老对象指向的内存地址是一样的,新对象值修改时老对象也会变化

实现浅拷贝的方式:

引用类型的变量,默认赋值操作就是浅拷贝

  1. slice2 := slice1
1
2
3
4
5
6
7
8
9
func main() {
slice1 := []int{1, 2, 3, 4, 5}
fmt.Printf("slice1: %v, %p\n", slice1, slice1)
slice2 := slice1
fmt.Printf("slice2: %v, %p\n", slice2, slice2)
}

slice1: [1 2 3 4 5], 0xc00001a120
slice2: [1 2 3 4 5], 0xc00001a120

Go slice扩容机制?

扩容会发生在slice append的时候,当slice的cap不足以容纳新元素,就会进行扩容,扩容规则如下

  • 如果原有 slice 长度小于 1024, 那么每次就扩容为原来的 2 倍
  • 如果原 slice 长度大于等于 1024, 那么每次扩容就扩为原来的 1.25 倍

补充

如果go 1.18+,原来的slice 容量oldcap小于256的时候,新 slice 的容量newcap是oldcap 的2倍;当oldcap容量大于等于 256 的时候,newcap会有个计算公式:newcap += (newcap +3*threshold) / 4 再对 newcap 作了一个内存对齐,这个和内存分配策略相关。进行内存对齐之后,新 slice 的容量是要大于等于按照前半部分生成的newcap。

Go slice为什么不是线程安全的?

先看下线程安全的定义

多个线程访问同一个对象时,调用这个对象的行为都可以获得正确的结果,那么这个对象就是线程安全的。

若有多个线程同时执行写操作,一般都需要考虑线程同步,否则的话就可能影响线程安全。

再看Go语言实现线程安全常用的几种方式

  1. 互斥锁
  2. 读写锁
  3. 原子操作
  4. sync.once
  5. sync.atomic
  6. channel

slice底层结构并没有使用加锁等方式,不支持并发读写,所以并不是线程安全的,使用多个 goroutine 对类型为 slice 的变量进行操作,每次输出的值大概率都不会一样,与预期值不一致; slice在并发执行中不会报错,但是数据会丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 切片非并发安全
* 多次执行,每次得到的结果都不一样
* 可以考虑使用 channel 本身的特性 (阻塞) 来实现安全的并发读写
*/
func TestSliceConcurrencySafe(t *testing.T) {
a := make([]int, 0)
var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
go func(i int) {
a = append(a, i)
wg.Done()
}(i)
}
wg.Wait()
t.Log(len(a))
// not equal 10000
}

Map

Go map的底层实现原理?

Go中的map是一个指针,占用8个字节,指向hmap结构体

源码包中src/runtime/map.go定义了hmap的数据结构:

hmap包含若干个结构为bmap的数组,每个bmap底层都采用链表结构,bmap通常叫其bucket

图片

hmap结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// A header for a Go map.
type hmap struct {
count int
// 代表哈希表中的元素个数,调用len(map)时,返回的就是该字段值。
flags uint8
// 状态标志(是否处于正在写入的状态等)
B uint8
// buckets(桶)的对数
// 如果B=5,则buckets数组的长度 = 2^B=32,意味着有32个桶
noverflow uint16
// 溢出桶的数量
hash0 uint32
// 生成hash的随机数种子
buckets unsafe.Pointer
// 指向buckets数组的指针,数组大小为2^B,如果元素个数为0,它为nil。
oldbuckets unsafe.Pointer
// 如果发生扩容,oldbuckets是指向老的buckets数组的指针,老的buckets数组大小是新的buckets的1/2;非扩容状态下,它为nil。
nevacuate uintptr
// 表示扩容进度,小于此地址的buckets代表已搬迁完成。
extra *mapextra
// 存储溢出桶,这个字段是为了优化GC扫描而设计的,下面详细介绍
}

bmap结构体

bmap 就是我们常说的“桶”,一个桶里面会最多装 8 个 key,这些 key 之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果的低B位是相同的,关于key的定位我们在map的查询中详细说明。在桶内,又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个位置(一个桶内最多有8个位置)。

1
2
3
4
5
6
7
// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8
// len为8的数组
// 用来快速定位key是否在这个bmap中
// 一个桶最多8个槽位,如果key所在的tophash值在tophash中,则代表该key在这个桶中
}

上面bmap结构是静态结构,在编译过程中runtime.bmap会拓展成以下结构体:

1
2
3
4
5
6
7
8
9
type bmap struct{
tophash [8]uint8
keys [8]keytype
// keytype 由编译器编译时候确定
values [8]elemtype
// elemtype 由编译器编译时候确定
overflow uintptr
// overflow指向下一个bmap,overflow是uintptr而不是*bmap类型,保证bmap完全不含指针,是为了减少gc,溢出桶存储到extra字段中
}

tophash就是用于实现快速定位key的位置,在实现过程中会使用key的hash值的高8位作为tophash值,存放在bmap的tophash字段中

tophash字段不仅存储key哈希值的高8位,还会存储一些状态值,用来表明当前桶单元状态,这些状态值都是小于minTopHash的

为了避免key哈希值的高8位值和这些状态值相等,产生混淆情况,所以当key哈希值高8位若小于minTopHash时候,自动将其值加上minTopHash作为该key的tophash。桶单元的状态值如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
emptyRest      = 0 // 表明此桶单元为空,且更高索引的单元也是空
emptyOne = 1 // 表明此桶单元为空
evacuatedX = 2 // 用于表示扩容迁移到新桶前半段区间
evacuatedY = 3 // 用于表示扩容迁移到新桶后半段区间
evacuatedEmpty = 4 // 用于表示此单元已迁移
minTopHash = 5 // key的tophash值与桶状态值分割线值,小于此值的一定代表着桶单元的状态,大于此值的一定是key对应的tophash值

func tophash(hash uintptr) uint8 {
top := uint8(hash >> (goarch.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
return top
}

mapextra结构体

当map的key和value都不是指针类型时候,bmap将完全不包含指针,那么gc时候就不用扫描bmap。bmap指向溢出桶的字段overflow是uintptr类型,为了防止这些overflow桶被gc掉,所以需要mapextra.overflow将它保存起来。如果bmap的overflow是*bmap类型,那么gc扫描的是一个个拉链表,效率明显不如直接扫描一段内存(hmap.mapextra.overflow)

1
2
3
4
5
6
7
8
type mapextra struct {
overflow *[]*bmap
// overflow 包含的是 hmap.buckets 的 overflow 的 buckets
oldoverflow *[]*bma
// oldoverflow 包含扩容时 hmap.oldbuckets 的 overflow 的 bucket
nextOverflow *bmap
// 指向空闲的 overflow bucket 的指针
}

总结

bmap(bucket)内存数据结构可视化如下:

注意到 key 和 value 是各自放在一起的,并不是 key/value/key/value/... 这样的形式,当key和value类型不一样的时候,key和value占用字节大小不一样,使用key/value这种形式可能会因为内存对齐导致内存空间浪费,所以Go采用key和value分开存储的设计,更节省内存空间

图片

Go map遍历为什么是无序的?

使用 range 多次遍历 map 时输出的 key 和 value 的顺序可能不同。这是 Go 语言的设计者们有意为之,旨在提示开发者们,Go 底层实现并不保证 map 遍历顺序稳定,请大家不要依赖 range 遍历结果顺序

主要原因有2点:

  • map在遍历时,并不是从固定的0号bucket开始遍历的,每次遍历,都会从一个随机值序号的bucket,再从其中随机的cell开始遍历
  • map遍历时,是按序遍历bucket,同时按需遍历bucket中和其overflow bucket中的cell。但是map在扩容后,会发生key的搬迁,这造成原来落在一个bucket中的key,搬迁后,有可能会落到其他bucket中了,从这个角度看,遍历map的结果就不可能是按照原来的顺序了

map 本身是无序的,且遍历时顺序还会被随机化,如果想顺序遍历 map,需要对 map key 先排序,再按照 key 的顺序遍历 map。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func TestMapRange(t *testing.T) {
m := map[int]string{1: "a", 2: "b", 3: "c"}
t.Log("first range:")
for i, v := range m {
t.Logf("m[%v]=%v ", i, v)
}
t.Log("\nsecond range:")
for i, v := range m {
t.Logf("m[%v]=%v ", i, v)
}

// 实现有序遍历
var sl []int
// 把 key 单独取出放到切片
for k := range m {
sl = append(sl, k)
}
// 排序切片
sort.Ints(sl)
// 以切片中的 key 顺序遍历 map 就是有序的了
for _, k := range sl {
t.Log(k, m[k])
}
}

Go map为什么是非线程安全的?

map默认是并发不安全的,同时对map进行并发读写时,程序会panic,原因如下:

Go 官方在经过了长时间的讨论后,认为 Go map 更应适配典型使用场景(不需要从多个 goroutine 中进行安全访问),而不是为了小部分情况(并发访问),导致大部分程序付出加锁代价(性能),决定了不支持。

场景: 2个协程同时读和写,以下程序会出现致命错误:fatal error: concurrent map writes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"time"
)

func main() {
s := make(map[int]int)
for i := 0; i < 100; i++ {
go func(i int) {
s[i] = i
}(i)
}
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Printf("map第%d个元素值是%d\n", i, s[i])
}(i)
}
time.Sleep(1 * time.Second)
}

如果想实现map线程安全,有两种方式:

方式一:使用读写锁 map + sync.RWMutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var lock sync.RWMutex
s := make(map[int]int)
for i := 0; i < 100; i++ {
go func(i int) {
lock.Lock()
s[i] = i
lock.Unlock()
}(i)
}
for i := 0; i < 100; i++ {
go func(i int) {
lock.RLock()
fmt.Printf("map第%d个元素值是%d\n", i, s[i])
lock.RUnlock()
}(i)
}
time.Sleep(1 * time.Second)
}

方式二:使用Go提供的 sync.Map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var m sync.Map
for i := 0; i < 100; i++ {
go func(i int) {
m.Store(i, i)
}(i)
}
for i := 0; i < 100; i++ {
go func(i int) {
v, ok := m.Load(i)
fmt.Printf("Load: %v, %v\n", v, ok)
}(i)
}
time.Sleep(1 * time.Second)
}

Go map如何查找?

Go 语言中读取 map 有两种语法:带 comma 和 不带 comma。当要查询的 key 不在 map 里,带 comma 的用法会返回一个 bool 型变量提示 key 是否在 map 中;而不带 comma 的语句则会返回一个 value 类型的零值。如果 value 是 int 型就会返回 0,如果 value 是 string 类型,就会返回空字符串。

1
2
3
4
5
6
7
8
9
// 不带 comma 用法
value := m["name"]
fmt.Printf("value:%s", value)

// 带 comma 用法
value, ok := m["name"]
if ok {
fmt.Printf("value:%s", value)
}

map的查找通过生成汇编码可以知道,根据 key 的不同类型/返回参数,编译器会将查找函数用更具体的函数替换,以优化效率:

key 类型 查找
uint32 mapaccess1_fast32(t maptype, h hmap, key uint32) unsafe.Pointer
uint32 mapaccess2_fast32(t maptype, h hmap, key uint32) (unsafe.Pointer, bool)
uint64 mapaccess1_fast64(t maptype, h hmap, key uint64) unsafe.Pointer
uint64 mapaccess2_fast64(t maptype, h hmap, key uint64) (unsafe.Pointer, bool)
string mapaccess1_faststr(t maptype, h hmap, ky string) unsafe.Pointer
string mapaccess2_faststr(t maptype, h hmap, ky string) (unsafe.Pointer, bool)

查找流程

img

1.写保护监测

函数首先会检查 map 的标志位 flags。如果 flags 的写标志位此时被置 1 了,说明有其他协程在执行“写”操作,进而导致程序 panic,这也说明了 map 不是线程安全的

1
2
3
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}

2.计算hash值

1
hash := t.hasher(key, uintptr(h.hash0))

key经过哈希函数计算后,得到的哈希值如下(主流64位机下共 64 个 bit 位), 不同类型的key会有不同的hash函数

1
10010111 | 000011110110110010001111001010100010010110010101010 │ 01010

3.找到hash对应的bucket

bucket定位:哈希值的低B个bit 位,用来定位key所存放的bucket

如果当前正在扩容中,并且定位到的旧bucket数据还未完成迁移,则使用旧的bucket(扩容前的bucket)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hash := t.hasher(key, uintptr(h.hash0))
// 桶的个数m-1,即 1<<B-1,B=5时,则有0~31号桶
m := bucketMask(h.B)
// 计算哈希值对应的bucket
// t.bucketsize为一个bmap的大小,通过对哈希值和桶个数取模得到桶编号,通过对桶编号和buckets起始地址进行运算,获取哈希值对应的bucket
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 是否在扩容
if c := h.oldbuckets; c != nil {
// 桶个数已经发生增长一倍,则旧bucket的桶个数为当前桶个数的一半
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
m >>= 1
}
// 计算哈希值对应的旧bucket
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
// 如果旧bucket的数据没有完成迁移,则使用旧bucket查找
if !evacuated(oldb) {
b = oldb
}
}

4. 遍历bucket查找

tophash值定位:哈希值的高8个bit 位,用来快速判断key是否已在当前bucket中(如果不在的话,需要去bucket的overflow中查找)

用步骤2中的hash值,得到高8个bit位,也就是10010111,转化为十进制,也就是151

1
2
3
4
5
6
7
8
top := tophash(hash)
func tophash(hash uintptr) uint8 {
top := uint8(hash >> (goarch.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
return top
}

上面函数中hash是64位的,sys.PtrSize值是8,所以top := uint8(hash >> (sys.PtrSize*8 - 8))等效top = uint8(hash >> 56),最后top取出来的值就是hash的高8位值

在 bucket 及bucket的overflow中寻找tophash 值(HOB hash)为 151* 的 槽位,即为key所在位置,找到了空槽位或者 2 号槽位,这样整个查找过程就结束了,其中找到空槽位代表没找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
// 未被使用的槽位,插入
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 找到tophash值对应的的key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.key.equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
return e
}
}
}

img

5. 返回key对应的指针

如果通过上面的步骤找到了key对应的槽位下标 i,我们再详细分析下key/value值是如何获取的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// keys的偏移量
dataOffset = unsafe.Offsetof(struct{
b bmap
v int64
}{}.v)

// 一个bucket的元素个数
bucketCnt = 8

// key 定位公式
k :=add(unsafe.Pointer(b),dataOffset+i*uintptr(t.keysize))

// value 定位公式
v:= add(unsafe.Pointer(b),dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

bucket 里 keys 的起始地址就是 unsafe.Pointer(b)+dataOffset

第 i 个下标 key 的地址就要在此基础上跨过 i 个 key 的大小;

而我们又知道,value 的地址是在所有 key 之后,因此第 i 个下标 value 的地址还需要加上所有 key 的偏移。

Go map冲突的解决方式?

比较常用的Hash冲突解决方案有链地址法和开放寻址法:

链地址法

当哈希冲突发生时,创建新单元,并将新单元添加到冲突单元所在链表的尾部。

开放寻址法

当哈希冲突发生时,从发生冲突的那个单元起,按照一定的次序,从哈希表中寻找一个空闲的单元,然后把发生冲突的元素存入到该单元。开放寻址法需要的表长度要大于等于所需要存放的元素数量

开放寻址法有多种方式:线性探测法、平方探测法、随机探测法和双重哈希法。这里以线性探测法来帮助读者理解开放寻址法思想

线性探测法

Hash(key) 表示关键字 key 的哈希值, 表示哈希表的槽位数(哈希表的大小)。

线性探测法则可以表示为:

如果 Hash(x) % M 已经有数据,则尝试 (Hash(x) + 1) % M ;

如果 (Hash(x) + 1) % M 也有数据了,则尝试 (Hash(x) + 2) % M ;

如果 (Hash(x) + 2) % M 也有数据了,则尝试 (Hash(x) + 3) % M ;

两种解决方案比较

对于链地址法,基于数组 + 链表进行存储,链表节点可以在需要时再创建,不必像开放寻址法那样事先申请好足够内存,因此链地址法对于内存的利用率会比开方寻址法高。链地址法对装载因子的容忍度会更高,并且适合存储大对象、大数据量的哈希表。而且相较于开放寻址法,它更加灵活,支持更多的优化策略,比如可采用红黑树代替链表。但是链地址法需要额外的空间来存储指针。

对于开放寻址法,它只有数组一种数据结构就可完成存储,继承了数组的优点,对CPU缓存友好,易于序列化操作。但是它对内存的利用率不如链地址法,且发生冲突时代价更高。当数据量明确、装载因子小,适合采用开放寻址法。

总结

在发生哈希冲突时,Python中dict采用的开放寻址法,Java的HashMap采用的是链地址法,而Go map也采用链地址法解决冲突,具体就是插入key到map中时,当key定位的桶填满8个元素后(这里的单元就是桶,不是元素),将会创建一个溢出桶,并且将溢出桶插入当前桶所在链表尾部。

1
2
3
4
5
6
7
8
if inserti == nil {
// all current buckets are full, allocate a new one.
newb := h.newoverflow(t, b)
// 创建一个新的溢出桶
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}

Go map 的负载因子为什么是 6.5?

什么是负载因子?

负载因子(load factor),用于衡量当前哈希表中空间占用率的核心指标,也就是每个 bucket 桶存储的平均元素个数。

1
负载因子 = 哈希表存储的元素个数/桶个数

另外负载因子与扩容、迁移等重新散列(rehash)行为有直接关系:

  • 在程序运行时,会不断地进行插入、删除等,会导致 bucket 不均,内存利用率低,需要迁移。
  • 在程序运行时,出现负载因子过大,需要做扩容,解决 bucket 过大的问题。

负载因子是哈希表中的一个重要指标,在各种版本的哈希表实现中都有类似的东西,主要目的是为了平衡 buckets 的存储空间大小和查找元素时的性能高低

在接触各种哈希表时都可以关注一下,做不同的对比,看看各家的考量。

为什么是 6.5?

为什么 Go 语言中哈希表的负载因子是 6.5,为什么不是 8 ,也不是 1。这里面有可靠的数据支撑吗?

测试报告

实际上这是 Go 官方的经过认真的测试得出的数字,一起来看看官方的这份测试报告。

报告中共包含 4 个关键指标,如下:

loadFactor %overflow bytes/entry hitprobe missprobe
4.00 2.13 20.77 3.00 4.00
4.50 4.05 17.30 3.25 4.50
5.00 6.85 14.77 3.50 5.00
5.50 10.55 12.94 3.75 5.50
6.00 15.27 11.67 4.00 6.00
6.50 20.90 10.79 4.25 6.50
7.00 27.14 10.15 4.50 7.00
7.50 34.03 9.73 4.75 7.50
8.00 41.10 9.40 5.00 8.00
  • loadFactor:负载因子,也有叫装载因子。
  • %overflow:溢出率,有溢出 bukcet 的百分比。
  • bytes/entry:平均每对 key/value 的开销字节数.
  • hitprobe:查找一个存在的 key 时,要查找的平均个数。
  • missprobe:查找一个不存在的 key 时,要查找的平均个数。

选择数值

Go 官方发现:装载因子越大,填入的元素越多,空间利用率就越高,但发生哈希冲突的几率就变大。反之,装载因子越小,填入的元素越少,冲突发生的几率减小,但空间浪费也会变得更多,而且还会提高扩容操作的次数

根据这份测试结果和讨论,Go 官方取了一个相对适中的值,把 Go 中的 map 的负载因子硬编码为 6.5,这就是 6.5 的选择缘由。

这意味着在 Go 语言中,当 map存储的元素个数大于或等于 6.5 * 桶个数 时,就会触发扩容行为

Go map如何扩容?

扩容时机:

向 map 插入新 key 的时候,会进行条件检测,符合下面这 2 个条件,就会触发扩容

1
2
3
4
5
6
7
8
9
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}

// 判断是否在扩容
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}

扩容条件:

条件1:超过负载

map元素个数 > 6.5 * 桶个数

1
2
3
4
5
6
7
8
9
10
11
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactor*bucketShift(B)
}

/*
其中

bucketCnt = 8,一个桶可以装的最大元素个数
loadFactor = 6.5,负载因子,平均每个桶的元素个数
bucketShift(B): 桶的个数
*/

条件2:溢出桶太多

当桶总数 < 2 ^ 15 时,如果溢出桶总数 >= 桶总数,则认为溢出桶过多。

当桶总数 >= 2 ^ 15 时,直接与 2 ^ 15 比较,当溢出桶总数 >= 2 ^ 15 时,即认为溢出桶太多了。

1
2
3
4
5
6
7
8
9
10
11
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
// If the threshold is too low, we do extraneous work.
// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
// "too many" means (approximately) as many overflow buckets as regular buckets.
// See incrnoverflow for more details.
if B > 15 {
B = 15
}
// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
return noverflow >= uint16(1)<<(B&15)
}

对于条件2,其实算是对条件1的补充。因为在负载因子比较小的情况下,有可能 map 的查找和插入效率也很低,而第 1 点识别不出来这种情况。

表面现象就是负载因子比较小比较小,即 map 里元素总数少,但是桶数量多(真实分配的桶数量多,包括大量的溢出桶)。比如不断的增删,这样会造成overflow的bucket数量增多,但负载因子又不高,达不到第 1 点的临界值,就不能触发扩容来缓解这种情况。这样会造成桶的使用率不高,值存储得比较稀疏,查找插入效率会变得非常低,因此有了第 2 扩容条件。

扩容机制:

双倍扩容:针对条件1,新建一个buckets数组,新的buckets大小是原来的2倍,然后旧buckets数据搬迁到新的buckets。该方法我们称之为双倍扩容

**等量扩容:**针对条件2,并不扩大容量,buckets数量维持不变,重新做一遍类似双倍扩容的搬迁动作,把松散的键值对重新排列一次,使得同一个 bucket 中的 key 排列地更紧密,节省空间,提高 bucket 利用率,进而保证更快的存取。该方法我们称之为**等量扩容**。

扩容函数:

上面说的 hashGrow() 函数实际上并没有真正地“搬迁”,它只是分配好了新的 buckets,并将老的 buckets 挂到了 oldbuckets 字段上。真正搬迁 buckets 的动作在 growWork() 函数中,而调用 growWork() 函数的动作是在 mapassign 和 mapdelete 函数中。也就是插入或修改、删除 key 的时候,都会尝试进行搬迁 buckets 的工作。先检查 oldbuckets 是否搬迁完毕,具体来说就是检查 oldbuckets 是否为 nil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func hashGrow(t *maptype, h *hmap) {
// 如果达到条件 1,那么将B值加1,相当于是原来的2倍
// 否则对应条件 2,进行等量扩容,所以 B 不变
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
// 记录老的buckets
oldbuckets := h.buckets
// 申请新的buckets空间
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
// 注意&^ 运算符,这块代码的逻辑是转移标志位
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 提交grow (atomic wrt gc)
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
// 搬迁进度为0
h.nevacuate = 0
// overflow buckets 数为0
h.noverflow = 0

// 如果发现hmap是通过extra字段 来存储 overflow buckets时
if h.extra != nil && h.extra.overflow != nil {
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}

由于 map 扩容需要将原有的 key/value 重新搬迁到新的内存地址,如果map存储了数以亿计的key-value,一次性搬迁将会造成比较大的延时,因此 Go map 的扩容采取了一种称为“渐进式”的方式,原有的 key 并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket。

1
2
3
4
5
6
7
8
9
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 为了确认搬迁的 bucket 是我们正在使用的 bucket
// 即如果当前key映射到老的bucket1,那么就搬迁该bucket1。
evacuate(t, h, bucket&h.oldbucketmask())
// 如果还未完成扩容工作,则再搬迁一个bucket。
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}

Go map和sync.Map谁的性能好,为什么?

Go 语言的 sync.Map 支持并发读写,采取了 “空间换时间” 的机制,冗余了两个数据结构,分别是:read 和 dirty

1
2
3
4
5
6
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}

对比原始map:

和原始map+RWLock的实现并发的方式相比,减少了加锁对性能的影响。它做了一些优化:可以无锁访问read map,而且会优先操作read map,倘若只操作read map就可以满足要求,那就不用去操作write map(dirty),所以在某些特定场景中它发生锁竞争的频率会远远小于map+RWLock的实现方式

优点:

适合读多写少的场景

缺点:

写多的场景,会导致 read map 缓存失效,需要加锁,冲突变多,性能急剧下降

Channel

Go channel的底层实现原理

概念:

Go中的channel 是一个队列,遵循先进先出的原则,负责协程之间的通信(Go 语言提倡不要通过共享内存来通信,而要通过通信来实现内存共享,CSP(Communicating Sequential Process)并发模型,就是通过 goroutine 和 channel 来实现的)

使用场景:

停止信号监听

定时任务

生产方和消费方解耦

控制并发数

底层数据结构:

通过var声明或者make函数创建的channel变量是一个存储在函数栈帧上的指针,占用8个字节,指向堆上的hchan结构体

源码包中src/runtime/chan.go定义了hchan的数据结构:

hchan

hchan结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type hchan struct {
closed uint32 // channel是否关闭的标志
elemtype *_type // channel中的元素类型

// channel分为无缓冲和有缓冲两种。
// 对于有缓冲的channel存储数据,使用了 ring buffer(环形缓冲区) 来缓存写入的数据,本质是循环数组
// 为啥是循环数组?普通数组不行吗,普通数组容量固定更适合指定的空间,弹出元素时,普通数组需要全部都前移
// 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
buf unsafe.Pointer // 指向底层循环数组的指针(环形缓冲区)
qcount uint // 循环数组中的元素数量
dataqsiz uint // 循环数组的长度
elemsize uint16 // 元素的大小
sendx uint // 下一次写下标的位置
recvx uint // 下一次读下标的位置

// 尝试读取channel或向channel写入数据而被阻塞的goroutine
recvq waitq // 读等待队列
sendq waitq // 写等待队列

lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}

等待队列:

双向链表,包含一个头结点和一个尾结点

每个节点是一个sudog结构体变量,记录哪个协程在等待,等待的是哪个channel,等待发送/接收的数据在哪里

1
2
3
4
5
6
7
8
9
10
11
12
13
type waitq struct {
first *sudog
last *sudog
}

type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
c *hchan
...
}

操作

创建

使用 make(chan T, cap) 来创建 channel,make 语法会在编译时,转换为 makechan64makechan

1
2
3
4
5
6
7
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}

return makechan(t, int(size))
}

创建channel 有两种,一种是带缓冲的channel,一种是不带缓冲的channel

1
2
3
4
// 带缓冲
ch := make(chan int, 3)
// 不带缓冲
ch := make(chan int)

创建时会做一些检查:

  • 元素大小不能超过 64K
  • 元素的对齐大小不能超过 maxAlign 也就是 8 字节
  • 计算出来的内存是否超过限制

创建时的策略:

  • 如果是无缓冲的 channel,会直接给 hchan 分配内存
  • 如果是有缓冲的 channel,并且元素不包含指针,那么会为 hchan 和底层数组分配一段连续的地址
  • 如果是有缓冲的 channel,并且元素包含指针,那么会为 hchan 和底层数组分别分配地址

发送

发送操作,编译时转换为runtime.chansend函数

1
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 

阻塞式:

调用chansend函数,并且block=true

1
ch <- 10

非阻塞式:

调用chansend函数,并且block=false

1
2
3
4
5
6
select {
case ch <- 10:
...

default
}

向 channel 中发送数据时大概分为两大块:检查和数据发送,数据发送流程如下:

  • 如果 channel 的读等待队列存在接收者goroutine
    • 将数据直接发送给第一个等待的 goroutine, 唤醒接收的 goroutine
  • 如果 channel 的读等待队列不存在接收者goroutine
    • 如果循环数组buf未满,那么将会把数据发送到循环数组buf的队尾
    • 如果循环数组buf已满,这个时候就会走阻塞发送的流程,将当前 goroutine 加入写等待队列,并挂起等待唤醒

接收

发送操作,编译时转换为runtime.chanrecv函数

1
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 

阻塞式:

调用chanrecv函数,并且block=true

1
2
3
4
5
6
7
8
9
10
<ch

v := <ch

v, ok := <ch

// 当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值
for i := range ch {
fmt.Println(i)
}

非阻塞式:

调用chanrecv函数,并且block=false

1
2
3
4
5
6
select {
case <-ch:
...

default
}

向 channel 中接收数据时大概分为两大块,检查和数据发送,而数据接收流程如下:

  • 如果 channel 的写等待队列存在发送者goroutine
    • 如果是无缓冲 channel,直接从第一个发送者goroutine那里把数据拷贝给接收变量,唤醒发送的 goroutine
    • 如果是有缓冲 channel(已满),将循环数组buf的队首元素拷贝给接收变量,将第一个发送者goroutine的数据拷贝到 buf循环数组队尾,唤醒发送的 goroutine
  • 如果 channel 的写等待队列不存在发送者goroutine
    • 如果循环数组buf非空,将循环数组buf的队首元素拷贝给接收变量
    • 如果循环数组buf为空,这个时候就会走阻塞接收的流程,将当前 goroutine 加入读等待队列,并挂起等待唤醒

关闭

关闭操作,调用close函数,编译时转换为runtime.closechan函数

1
close(ch)
1
func closechan(c *hchan) 

案例分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"time"
"unsafe"
)

func main() {
// ch是长度为4的带缓冲的channel
// 初始hchan结构体重的buf为空,sendx和recvx均为0
ch := make(chan string, 4)
fmt.Println(ch, unsafe.Sizeof(ch))
go sendTask(ch)
go receiveTask(ch)
time.Sleep(1 * time.Second)
}

// G1是发送者
// 当G1向ch里发送数据时,首先会对buf加锁,然后将task存储的数据copy到buf中,然后sendx++,然后释放对buf的锁
func sendTask(ch chan string) {
taskList := []string{"this", "is", "a", "demo"}
for _, task := range taskList {
ch <- task //发送任务到channel
}
}

// G2是接收者
// 当G2消费ch的时候,会首先对buf加锁,然后将buf中的数据copy到task变量对应的内存里,然后recvx++,并释放锁
func receiveTask(ch chan string) {
for {
task := <-ch //接收任务
fmt.Println("received", task) //处理任务
}
}

总结hchan结构体的主要组成部分有四个:

  • 用来保存goroutine之间传递数据的循环数组:buf
  • 用来记录此循环数组当前发送或接收数据的下标值:sendx和recvx
  • 用于保存向该chan发送和从该chan接收数据被阻塞的goroutine队列: sendq 和 recvq
  • 保证channel写入和读取数据时线程安全的锁:lock

Go channel有什么特点?

channel有2种类型:无缓冲、有缓冲

channel有3种模式:写操作模式(单向通道)、读操作模式(单向通道)、读写操作模式(双向通道)

写操作模式 读操作模式 读写操作模式
创建 make(chan<- int) make(<-chan int) make(chan int)

channel有3种状态:未初始化、正常、关闭

未初始化 关闭 正常
关闭 panic panic 正常关闭
发送 永远阻塞导致死锁 panic 阻塞或者成功发送
接收 永远阻塞导致死锁 缓冲区为空则为零值, 否则可以继续读 阻塞或者成功接收

注意点

  1. 一个 channel不能多次关闭,会导致painc
  2. 如果多个 goroutine 都监听同一个 channel,那么 channel 上的数据都可能随机被某一个 goroutine 取走进行消费
  3. 如果多个 goroutine 监听同一个 channel,如果这个 channel 被关闭,则所有 goroutine 都能收到退出信号

Go channel有无缓冲的区别?

无缓冲:一个送信人去你家送信,你不在家他不走,你一定要接下信,他才会走。

有缓冲:一个送信人去你家送信,扔到你家的信箱转身就走,除非你的信箱满了,他必须等信箱有多余空间才会走。

无缓冲 有缓冲
创建方式 make(chan TYPE) make(chan TYPE, SIZE)
发送阻塞 数据接收前发送阻塞 缓冲满时发送阻塞
接收阻塞 数据发送前接收阻塞 缓冲空时接收阻塞

非缓冲 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"time"
)

func loop(ch chan int) {
for {
select {
case i := <-ch:
fmt.Println("this value of unbuffer channel", i)
}
}
}

func main() {
ch := make(chan int)
ch <- 1
go loop(ch)
time.Sleep(1 * time.Millisecond)
}

这里会报错 fatal error: all goroutines are asleep - deadlock! 就是因为 ch<-1 发送了,但是同时没有接收者,所以就发生了阻塞

但如果我们把 ch <- 1 放到 go loop(ch) 下面,程序就会正常运行

缓冲 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"time"
)

func loop(ch chan int) {
for {
select {
case i := <-ch:
fmt.Println("this value of unbuffer channel", i)
}
}
}

func main() {
ch := make(chan int,3)
ch <- 1
ch <- 2
ch <- 3
ch <- 4
go loop(ch)
time.Sleep(1 * time.Millisecond)
}

这里也会报 fatal error: all goroutines are asleep - deadlock! ,这是因为 channel 的大小为 3 ,而我们要往里面塞 4 个数据,所以就会阻塞住,解决的办法有两个:

  1. 把 channel 长度调大一点
  2. 把 channel 的信息发送者 ch <- 1 这些代码移动到 go loop(ch) 下面 ,让 channel 实时消费就不会导致阻塞了

Go channel为什么是线程安全的?

为什么设计成线程安全?

不同协程通过channel进行通信,本身的使用场景就是多线程,为了保证数据的一致性,必须实现线程安全

如何实现线程安全的?

channel的底层实现中,hchan结构体中采用Mutex锁来保证数据读写安全。在对循环数组buf中的数据进行入队和出队操作时,必须先获取互斥锁,才能操作channel数据

Go channel如何控制goroutine并发执行顺序?

多个goroutine并发执行时,每一个goroutine抢到处理器的时间点不一致,gorouine的执行本身不能保证顺序。即代码中先写的gorouine并不能保证先执行

思路:使用channel进行通信通知,用channel去传递信息,从而控制并发执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"sync"
"time"
)

var wg sync.WaitGroup

func main() {
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
ch3 := make(chan struct{}, 1)
ch1 <- struct{}{}
wg.Add(3)
start := time.Now().Unix()
go print("gorouine1", ch1, ch2)
go print("gorouine2", ch2, ch3)
go print("gorouine3", ch3, ch1)
wg.Wait()
end := time.Now().Unix()
fmt.Printf("duration:%d\n", end-start)
}

func print(gorouine string, inputchan chan struct{}, outchan chan struct{}) {
// 模拟内部操作耗时
time.Sleep(1 * time.Second)
select {
case <-inputchan:
fmt.Printf("%s\n", gorouine)
outchan <- struct{}{}
}
wg.Done()
}
/*
gorouine1
gorouine2
gorouine3
duration:1
*/

Go channel共享内存有什么优劣势?

“不要通过共享内存来通信,我们应该使用通信来共享内存” 这句话想必大家已经非常熟悉了,在官方的博客,初学时的教程,甚至是在 Go 的源码中都能看到

无论是通过共享内存来通信还是通过通信来共享内存,最终我们应用程序都是读取的内存当中的数据,只是前者是直接读取内存的数据,而后者是通过发送消息的方式来进行同步。而通过发送消息来同步的这种方式常见的就是 Go 采用的 CSP(Communication Sequential Process) 模型以及 Erlang 采用的 Actor 模型,这两种方式都是通过通信来共享内存。

02_Go进阶03_blog_channel.png

大部分的语言采用的都是第一种方式直接去操作内存,然后通过互斥锁,CAS 等操作来保证并发安全。Go 引入了 Channel 和 Goroutine 实现 CSP 模型将生产者和消费者进行了解耦,Channel 其实和消息队列很相似。而 Actor 模型和 CSP 模型都是通过发送消息来共享内存,但是它们之间最大的区别就是 Actor 模型当中并没有一个独立的 Channel 组件,而是 Actor 与 Actor 之间直接进行消息的发送与接收,每个 Actor 都有一个本地的“信箱”消息都会先发送到这个“信箱当中”。

优点

  • 使用 channel 可以帮助我们解耦生产者和消费者,可以降低并发当中的耦合

缺点

  • 容易出现死锁的情况

Go channel发送和接收什么情况下会死锁?

死锁:

  • 单个协程永久阻塞
  • 两个或两个以上的协程的执行过程中,由于竞争资源或由于彼此通信而造成的一种阻塞的现象。

channel死锁场景:

  • 非缓存channel只写不读
  • 非缓存channel读在写后面
  • 缓存channel写入超过缓冲区数量
  • 空读
  • 多个协程互相等待

1、非缓存channel只写不读

1
2
3
4
func deadlock1() {
ch := make(chan int)
ch <- 3 // 这里会发生一直阻塞的情况,执行不到下面一句
}

2、非缓存channel读在写后面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func deadlock2() {
ch := make(chan int)
ch <- 3 // 这里会发生一直阻塞的情况,执行不到下面一句
num := <-ch
fmt.Println("num=", num)
}

func deadlock2() {
ch := make(chan int)
ch <- 100 // 这里会发生一直阻塞的情况,执行不到下面一句
go func() {
num := <-ch
fmt.Println("num=", num)
}()
time.Sleep(time.Second)
}

3、缓存channel写入超过缓冲区数量

1
2
3
4
5
6
7
func deadlock3() {
ch := make(chan int, 3)
ch <- 3
ch <- 4
ch <- 5
ch <- 6 // 这里会发生一直阻塞的情况
}

4、空读

1
2
3
4
5
func deadlock4() {
ch := make(chan int)
// ch := make(chan int, 1)
fmt.Println(<-ch) // 这里会发生一直阻塞的情况
}

5、多个协程互相等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func deadlock5() {
ch1 := make(chan int)
ch2 := make(chan int)
// 互相等对方造成死锁
go func() {
for {
select {
case num := <-ch1:
fmt.Println("num=", num)
ch2 <- 100
}
}
}()

for {
select {
case num := <-ch2:
fmt.Println("num=", num)
ch1 <- 300
}
}
}

Mutex

Go 互斥锁的实现原理?

Go sync包提供了两种锁类型:互斥锁sync.Mutex 和 读写互斥锁sync.RWMutex,都属于悲观锁。

概念

Mutex是互斥锁,当一个 goroutine 获得了锁后,其他 goroutine 不能获取锁(只能存在一个写者或读者,不能同时读和写)

使用场景

多个线程同时访问临界区,为保证数据的安全,锁住一些共享资源, 以防止并发访问这些共享数据时可能导致的数据不一致问题。

获取锁的线程可以正常访问临界区,未获取到锁的线程等待锁释放后可以尝试获取锁

img

底层实现结构

互斥锁对应的是底层结构是sync.Mutex结构体,,位于 src/sync/mutex.go中

1
2
3
4
type Mutex struct {  
state int32
sema uint32
}

state表示锁的状态,有锁定、被唤醒、饥饿模式等,并且是用state的二进制位来标识的,不同模式下会有不同的处理方式

mutex_state

sema表示信号量,mutex阻塞队列的定位是通过这个变量来实现的,从而实现goroutine的阻塞和唤醒

mutex_sema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
addr = &sema
func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
root := semroot(addr)
root.queue(addr, s, lifo)
root.dequeue(addr)

var semtable [251]struct {
root semaRoot
...
}

type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}

type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 指向sema变量
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
...
}

操作

锁的实现一般会依赖于原子操作、信号量,通过atomic 包中的一些原子操作来实现锁的锁定,通过信号量来实现线程的阻塞与唤醒

加锁

通过原子操作cas加锁,如果加锁不成功,根据不同的场景选择自旋重试加锁或者阻塞等待被唤醒后加锁

img

1
2
3
4
5
6
7
8
func (m *Mutex) Lock() {
// Fast path: 幸运之路,一下就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path:缓慢之路,尝试自旋或阻塞获取锁
m.lockSlow()
}
解锁

通过原子操作add解锁,如果仍有goroutine在等待,唤醒等待的goroutine

mutex_unlock

1
2
3
4
5
6
7
8
func (m *Mutex) Unlock() {  
// Fast path: 幸运之路,解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Slow path:如果有等待的goroutine,唤醒等待的goroutine
m.unlockSlow()
}
}
注意点:
  • 在 Lock() 之前使用 Unlock() 会导致 panic 异常
  • 使用 Lock() 加锁后,再次 Lock() 会导致死锁(不支持重入),需Unlock()解锁后才能再加锁
  • 锁定状态与 goroutine 没有关联,一个 goroutine 可以 Lock,另一个 goroutine 可以 Unlock

Go 互斥锁正常模式和饥饿模式的区别?

在Go一共可以分为两种抢锁的模式,一种是正常模式,另外一种是饥饿模式

正常模式(非公平锁)

在刚开始的时候,是处于正常模式(Barging),也就是,当一个G1持有着一个锁的时候,G2会自旋的去尝试获取这个锁

自旋超过4次还没有能获取到锁的时候,这个G2就会被加入到获取锁的等待队列里面,并阻塞等待唤醒

正常模式下,所有等待锁的 goroutine 按照 FIFO(先进先出)顺序等待。唤醒的goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 竞争锁。新请求锁的 goroutine 具有优势:它正在 CPU 上执行,而且可能有好几个,所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败,长时间获取不到锁,就会切换到饥饿模式

饥饿模式(公平锁)

当一个 goroutine 等待锁时间超过 1 毫秒时,它可能会遇到饥饿问题。 在版本1.9中,这种场景下Go Mutex 切换到饥饿模式(handoff),解决饥饿问题。

1
starving = runtime_nanotime()-waitStartTime > 1e6

正常模式下,所有等待锁的 goroutine 按照 FIFO(先进先出)顺序等待。唤醒的goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 竞争锁。新请求锁的 goroutine 具有优势:它正在 CPU 上执行,而且可能有好几个,所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败,长时间获取不到锁,就会切换到饥饿模式

那么也不可能说永远的保持一个饥饿的状态,总归会有吃饱的时候,也就是总有那么一刻Mutex会回归到正常模式,那么回归正常模式必须具备的条件有以下几种:

  1. G的执行时间小于1ms
  2. 等待队列已经全部清空了

当满足上述两个条件的任意一个的时候,Mutex会切换回正常模式,而Go的抢锁的过程,就是在这个正常模式和饥饿模式中来回切换进行的。

1
2
3
4
5
delta := int32(mutexLocked - 1<<mutexWaiterShift)  
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)

总结

对于两种模式,正常模式下的性能是最好的,goroutine 可以连续多次获取锁,饥饿模式解决了取锁公平的问题,但是性能会下降,其实是性能和公平的 一个平衡模式。

Go 互斥锁允许自旋的条件?

线程没有获取到锁时常见有2种处理方式:

  • 一种是没有获取到锁的线程就一直循环等待判断该资源是否已经释放锁,这种锁也叫做自旋锁,它不用将线程阻塞起来, 适用于并发低且程序执行时间短的场景,缺点是cpu占用较高
  • 另外一种处理方式就是把自己阻塞起来,会释放CPU给其他线程,内核会将线程置为「睡眠」状态,等到锁被释放后,内核会在合适的时机唤醒该线程,适用于高并发场景,缺点是有线程上下文切换的开销

Go语言中的Mutex实现了自旋与阻塞两种场景,当满足不了自旋条件时,就会进入阻塞

允许自旋的条件:

  1. 锁已被占用,并且锁不处于饥饿模式。
  2. 积累的自旋次数小于最大自旋次数(active_spin=4)。
  3. cpu 核数大于 1。
  4. 有空闲的 P。
  5. 当前 goroutine 所挂载的 P 下,本地待运行队列为空。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {  
...
runtime_doSpin()
continue
}


func sync_runtime_canSpin(i int) bool {
if i >= active_spin
|| ncpu <= 1
|| gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

自旋:

1
2
3
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}

如果可以进入自旋状态之后就会调用 runtime_doSpin 方法进入自旋, doSpin 方法会调用 procyield(30) 执行30次 PAUSE 指令,什么都不做,但是会消耗CPU时间

Go 读写锁的实现原理?

概念

读写互斥锁RWMutex,是对Mutex的一个扩展,当一个 goroutine 获得了读锁后,其他 goroutine可以获取读锁,但不能获取写锁;当一个 goroutine 获得了写锁后,其他 goroutine既不能获取读锁也不能获取写锁(只能存在一个写者或多个读者,可以同时读)

使用场景

多于的情况(既保证线程安全,又保证性能不太差)

底层实现结构

互斥锁对应的是底层结构是sync.RWMutex结构体,,位于 src/sync/rwmutex.go中

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 复用互斥锁
writerSem uint32 // 信号量,用于写等待读
readerSem uint32 // 信号量,用于读等待写
readerCount int32 // 当前执行读的 goroutine 数量
readerWait int32 // 被阻塞的准备读的 goroutine 的数量
}
操作:

读锁的加锁与释放

1
2
func (rw *RWMutex) RLock() // 加读锁
func (rw *RWMutex) RUnlock() // 释放读锁
加读锁
1
2
3
4
5
6
7
func (rw *RWMutex) RLock() {
// 为什么readerCount会小于0呢?往下看发现writer的Lock()会对readerCount做减法操作(原子操作)
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}
}

atomic.AddInt32(&rw.readerCount, 1) 调用这个原子方法,对当前在读的数量加1,如果返回负数,那么说明当前有其他写锁,这时候就调用 runtime_SemacquireMutex 休眠当前goroutine 等待被唤醒

释放读锁

解锁的时候对正在读的操作减1,如果返回值小于 0 那么说明当前有在写的操作,这个时候调用 rUnlockSlow 进入慢速通道

1
2
3
4
5
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}

被阻塞的准备读的 goroutine 的数量减1,readerWait 为 0,就表示当前没有正在准备读的 goroutine 这时候调用 runtime_Semrelease 唤醒写操作

1
2
3
4
5
6
7
func (rw *RWMutex) rUnlockSlow(r int32) {
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

写锁的加锁与释放

1
2
func (rw *RWMutex) Lock() // 加写锁
func (rw *RWMutex) Unlock() // 释放写锁
加写锁
1
2
3
4
5
6
7
8
9
10
11
12
const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
}

首先调用互斥锁的 lock,获取到互斥锁之后,如果计算之后当前仍然有其他 goroutine 持有读锁,那么就调用 runtime_SemacquireMutex 休眠当前的 goroutine 等待所有的读操作完成

这里readerCount 原子性加上一个很大的负数,是防止后面的协程能拿到读锁,阻塞读

释放写锁
1
2
3
4
5
6
7
8
9
10
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
}

解锁的操作,会先调用 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) 将恢复之前写入的负数,然后根据当前有多少个读操作在等待,循环唤醒

注意点

  • 读锁或写锁在 Lock() 之前使用 Unlock() 会导致 panic 异常
  • 使用 Lock() 加锁后,再次 Lock() 会导致死锁(不支持重入),需Unlock()解锁后才能再加锁
  • 锁定状态与 goroutine 没有关联,一个 goroutine 可以 RLock(Lock),另一个 goroutine 可以 RUnlock(Unlock)

互斥锁和读写锁的区别

  • 读写锁区分读者和写者,而互斥锁不区分
  • 互斥锁同一时间只允许一个线程访问该对象,无论读写;读写锁同一时间内只允许一个写者,但是允许多个读者同时读对象。

Go 可重入锁如何实现?

概念

可重入锁又称为递归锁,是指在同一个线程在外层方法获取锁的时候,在进入该线程的内层方法时会自动获取锁,不会因为之前已经获取过还没释放再次加锁导致死锁

为什么Go语言中没有可重入锁?

Mutex 不是可重入的锁。Mutex 的实现中没有记录哪个 goroutine 拥有这把锁。理论上,任何 goroutine 都可以随意地 Unlock 这把锁,所以没办法计算重入条件,并且Mutex 重复Lock会导致死锁。

如何实现可重入锁?

实现一个可重入锁需要这两点:

  • 记住持有锁的线程
  • 统计重入的次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main

import (
"bytes"
"fmt"
"runtime"
"strconv"
"sync"
"sync/atomic"
)

type ReentrantLock struct {
sync.Mutex
recursion int32 // 这个goroutine 重入的次数
owner int64 // 当前持有锁的goroutine id
}

// Get returns the id of the current goroutine.
func GetGoroutineID() int64 {
var buf [64]byte
var s = buf[:runtime.Stack(buf[:], false)]
s = s[len("goroutine "):]
s = s[:bytes.IndexByte(s, ' ')]
gid, _ := strconv.ParseInt(string(s), 10, 64)
return gid
}

func NewReentrantLock() sync.Locker {
res := &ReentrantLock{
Mutex: sync.Mutex{},
recursion: 0,
owner: 0,
}
return res
}

// ReentrantMutex 包装一个Mutex,实现可重入
type ReentrantMutex struct {
sync.Mutex
owner int64 // 当前持有锁的goroutine id
recursion int32 // 这个goroutine 重入的次数
}

func (m *ReentrantMutex) Lock() {
gid := GetGoroutineID()
// 如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
if atomic.LoadInt64(&m.owner) == gid {
m.recursion++
return
}
m.Mutex.Lock()
// 获得锁的goroutine第一次调用,记录下它的goroutine id,调用次数加1
atomic.StoreInt64(&m.owner, gid)
m.recursion = 1
}

func (m *ReentrantMutex) Unlock() {
gid := GetGoroutineID()
// 非持有锁的goroutine尝试释放锁,错误的使用
if atomic.LoadInt64(&m.owner) != gid {
panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
}
// 调用次数减1
m.recursion--
if m.recursion != 0 { // 如果这个goroutine还没有完全释放,则直接返回
return
}
// 此goroutine最后一次调用,需要释放锁
atomic.StoreInt64(&m.owner, -1)
m.Mutex.Unlock()
}

func main() {
var mutex = &ReentrantMutex{}
mutex.Lock()
mutex.Lock()
fmt.Println(111)
mutex.Unlock()
mutex.Unlock()
}

Go 原子操作有哪些?

Go atomic包是最轻量级的锁(也称无锁结构),可以在不形成临界区和创建互斥量的情况下完成并发安全的值替换操作,不过这个包只支持int32/int64/uint32/uint64/uintptr这几种数据类型的一些基础操作(增减、交换、载入、存储等)

概念

原子操作仅会由一个独立的CPU指令代表和完成。原子操作是无锁的,常常直接通过CPU指令直接实现。 事实上,其它同步技术的实现常常依赖于原子操作。

使用场景

当我们想要对某个变量并发安全的修改,除了使用官方提供的 mutex,还可以使用 sync/atomic 包的原子操作,它能够保证对变量的读取或修改期间不被其他的协程所影响。

atomic 包提供的原子操作能够确保任一时刻只有一个goroutine对变量进行操作,善用 atomic 能够避免程序中出现大量的锁操作。

常见操作

  • 增减Add
  • 载入Load
  • 比较并交换CompareAndSwap
  • 交换Swap
  • 存储Store

atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法

下面将分别介绍这些操作:

增减操作

此类操作的前缀为 Add

1
2
3
4
5
6
7
8
9
func AddInt32(addr *int32, delta int32) (new int32)

func AddInt64(addr *int64, delta int64) (new int64)

func AddUint32(addr *uint32, delta uint32) (new uint32)

func AddUint64(addr *uint64, delta uint64) (new uint64)

func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

需要注意的是,第一个参数必须是指针类型的值,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的CPU指令,确保同一时间只有一个goroutine能够进行操作。

使用举例:

1
2
3
4
func add(addr *int64, delta int64) {
atomic.AddInt64(addr, delta) //加操作
fmt.Println("add opts: ", *addr)
}
载入操作

此类操作的前缀为 Load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func LoadInt32(addr *int32) (val int32)

func LoadInt64(addr *int64) (val int64)

func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

func LoadUint32(addr *uint32) (val uint32)

func LoadUint64(addr *uint64) (val uint64)

func LoadUintptr(addr *uintptr) (val uintptr)

// 特殊类型: Value类型,常用于配置变更
func (v *Value) Load() (x interface{}) {}

载入操作能够保证原子的读变量的值,当读取的时候,任何其他CPU操作都无法对该变量进行读写,其实现机制受到底层硬件的支持。

使用示例:

1
2
3
func load(addr *int64) {
fmt.Println("load opts: ", atomic.LoadInt64(&opts))
}
比较并交换

此类操作的前缀为 CompareAndSwap, 该操作简称 CAS,可以用来实现乐观锁

1
2
3
4
5
6
7
8
9
10
11
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

该操作在进行交换前首先确保变量的值未被更改,即仍然保持参数 old 所记录的值,满足此前提下才进行交换操作。CAS的做法类似操作数据库时常见的乐观锁机制。

需要注意的是,当有大量的goroutine 对变量进行读写操作时,可能导致CAS操作无法成功,这时可以利用for循环多次尝试。

使用示例:

1
2
3
4
5
6
func compareAndSwap(addr *int64, oldValue int64, newValue int64) {
if atomic.CompareAndSwapInt64(addr, oldValue, newValue) {
fmt.Println("cas opts: ", *addr)
return
}
}
交换

此类操作的前缀为 Swap

1
2
3
4
5
6
7
8
9
10
11
func SwapInt32(addr *int32, new int32) (old int32)

func SwapInt64(addr *int64, new int64) (old int64)

func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

func SwapUint32(addr *uint32, new uint32) (old uint32)

func SwapUint64(addr *uint64, new uint64) (old uint64)

func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

相对于CAS,明显此类操作更为暴力直接,并不管变量的旧值是否被改变,直接赋予新值然后返回背替换的值。

1
2
3
4
func swap(addr *int64, newValue int64) {
atomic.SwapInt64(addr, newValue)
fmt.Println("swap opts: ", *addr)
}
存储

此类操作的前缀为 Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func StoreInt32(addr *int32, val int32)

func StoreInt64(addr *int64, val int64)

func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

func StoreUint32(addr *uint32, val uint32)

func StoreUint64(addr *uint64, val uint64)

func StoreUintptr(addr *uintptr, val uintptr)

// 特殊类型: Value类型,常用于配置变更
func (v *Value) Store(x interface{})

此类操作确保了写变量的原子性,避免其他操作读到了修改变量过程中的脏数据。

1
2
3
4
func store(addr *int64, newValue int64) {
atomic.StoreInt64(addr, newValue)
fmt.Println("store opts: ", *addr)
}

Go 原子操作和锁的区别?

  • 原子操作由底层硬件支持,而锁是基于原子操作+信号量完成的。若实现相同的功能,前者通常会更有效率
  • 原子操作是单个指令的互斥操作;互斥锁/读写锁是一种数据结构,可以完成临界区(多个指令)的互斥操作,扩大原子操作的范围
  • 原子操作是无锁操作,属于乐观锁;说起锁的时候,一般属于悲观锁
  • 原子操作存在于各个指令/语言层级,比如“机器指令层级的原子操作”,“汇编指令层级的原子操作”,“Go语言层级的原子操作”等。
  • 锁也存在于各个指令/语言层级中,比如“机器指令层级的锁”,“汇编指令层级的锁”,“Go语言层级的锁”等

Goroutine

Go goroutine的底层实现原理?

概念

Goroutine可以理解为一种Go语言的协程(轻量级线程),是Go支持高并发的基础,属于用户态的线程,由Go runtime管理而不是操作系统。

底层数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type g struct {
goid int64 // 唯一的goroutine的ID
sched gobuf // goroutine切换时,用于保存g的上下文
stack stack // 栈
gopc // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
...
}

type gobuf struct {
sp uintptr // 栈指针位置
pc uintptr // 运行到的程序位置
g guintptr // 指向 goroutine
ret uintptr // 保存系统调用的返回值
...
}

type stack struct {
lo uintptr // 栈的下界内存地址
hi uintptr // 栈的上界内存地址
}

最终有一个 runtime.g 对象放入调度队列

状态流转

状态 含义
空闲中_Gidle G刚刚新建, 仍未初始化
待运行_Grunnable 就绪状态,G在运行队列中, 等待M取出并运行
运行中_Grunning M正在运行这个G, 这时候M会拥有一个P
系统调用中_Gsyscall M正在运行这个G发起的系统调用, 这时候M并不拥有P
等待中_Gwaiting G在等待某些条件完成, 这时候G不在运行也不在运行队列中(可能在channel的等待队列中)
已中止_Gdead G未被使用, 可能已执行完毕
栈复制中_Gcopystack G正在获取一个新的栈空间并把原来的内容复制过去(用于防止GC扫描)

img

创建

通过go关键字调用底层函数runtime.newproc()创建一个goroutine

当调用该函数之后,goroutine会被设置成runnable状态

1
2
3
4
5
6
func main() {
go func() {
fmt.Println("func routine")
}()
fmt.Println("main goroutine")
}

创建好的这个goroutine会新建一个自己的栈空间,同时在G的sched中维护栈地址与程序计数器这些信息。

每个 G 在被创建之后,都会被优先放入到本地队列中,如果本地队列已经满了,就会被放入到全局队列中。

运行

goroutine 本身只是一个数据结构,真正让 goroutine 运行起来的是调度器。Go 实现了一个用户态的调度器(GMP模型),这个调度器充分利用现代计算机的多核特性,同时让多个 goroutine 运行,同时 goroutine 设计的很轻量级,调度和上下文切换的代价都比较小。

img

调度时机:

  • 新起一个协程和协程执行完毕
  • 会阻塞的系统调用,比如文件io、网络io
  • channel、mutex等阻塞操作
  • time.sleep
  • 垃圾回收之后
  • 主动调用runtime.Gosched()
  • 运行过久或系统调用过久等等

每个 M 开始执行 P 的本地队列中的 G时,goroutine会被设置成running状态

如果某个 M 把本地队列中的G都执行完成之后,然后就会去全局队列中拿 G,这里需要注意,每次去全局队列拿 G 的时候,都需要上锁,避免同样的任务被多次拿。

如果全局队列都被拿完了,而当前 M 也没有更多的 G 可以执行的时候,它就会去其他 P 的本地队列中拿任务,这个机制被称之为 work stealing 机制,每次会拿走一半的任务,向下取整,比如另一个 P 中有 3 个任务,那一半就是一个任务。

当全局队列为空,M 也没办法从其他的 P 中拿任务的时候,就会让自身进入自选状态,等待有新的 G 进来。最多只会有 GOMAXPROCS 个 M 在自旋状态,过多 M 的自旋会浪费 CPU 资源。

阻塞

channel的读写操作、等待锁、等待网络数据、系统调用等都有可能发生阻塞,会调用底层函数runtime.gopark(),会让出CPU时间片,让调度器安排其它等待的任务运行,并在下次某个时候从该位置恢复执行。

当调用该函数之后,goroutine会被设置成waiting状态

唤醒

处于waiting状态的goroutine,在调用runtime.goready()函数之后会被唤醒,唤醒的goroutine会被重新放到M对应的上下文P对应的runqueue中,等待被调度。

当调用该函数之后,goroutine会被设置成runnable状态

退出

当goroutine执行完成后,会调用底层函数runtime.Goexit()

当调用该函数之后,goroutine会被设置成dead状态

Go goroutine和线程的区别?

goroutine 线程
内存占用 创建一个 goroutine 的栈内存消耗为 2 KB,实际运行过程中,如果栈空间不够用,会自动进行扩容 创建一个 线程 的栈内存消耗为 1 MB
创建和销毀 goroutine 因为是由 Go runtime 负责管理的,创建和销毁的消耗非常小,是用户级。 线程 创建和销毀都会有巨大的消耗,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池
切换 goroutines 切换只需保存三个寄存器:PC、SP、BP goroutine 的切换约为 200 ns,相当于 2400-3600 条指令。 当线程切换时,需要保存各种寄存器,以便恢复现场。 线程切换会消耗 1000-1500 ns,相当于 12000-18000 条指令。

Go goroutine泄露的场景?

泄露原因

  • Goroutine 内进行channel/mutex 等读写操作被一直阻塞。
  • Goroutine 内的业务逻辑进入死循环,资源一直无法释放。
  • Goroutine 内的业务逻辑进入长时间等待,有不断新增的 Goroutine 进入等待

泄露场景

如果输出的 goroutines 数量是在不断增加的,就说明存在泄漏

nil channel

channel 如果忘记初始化,那么无论你是读,还是写操作,都会造成阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
fmt.Println("before goroutines: ", runtime.NumGoroutine())
block1()
time.Sleep(time.Second * 1)
fmt.Println("after goroutines: ", runtime.NumGoroutine())
}

func block1() {
var ch chan int
for i := 0; i < 10; i++ {
go func() {
<-ch
}()
}
}

输出结果:

1
2
before goroutines:  1
after goroutines: 11

发送不接收

channel 发送数量 超过 channel接收数量,就会造成阻塞

1
2
3
4
5
6
7
8
func block2() {
ch := make(chan int)
for i := 0; i < 10; i++ {
go func() {
ch <- 1
}()
}
}

接收不发送

channel 接收数量 超过 channel发送数量,也会造成阻塞

1
2
3
4
5
6
7
8
func block3() {
ch := make(chan int)
for i := 0; i < 10; i++ {
go func() {
<-ch
}()
}
}

http request body未关闭

resp.Body.Close() 未被调用时,goroutine不会退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func requestWithNoClose() {
_, err := http.Get("https://www.baidu.com")
if err != nil {
fmt.Println("error occurred while fetching page, error: %s", err.Error())
}
}

func requestWithClose() {
resp, err := http.Get("https://www.baidu.com")
if err != nil {
fmt.Println("error occurred while fetching page, error: %s", err.Error())
return
}
defer resp.Body.Close()
}

func block4() {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
requestWithNoClose()
}()
}
}

var wg = sync.WaitGroup{}

func main() {
block4()
wg.Wait()
}

一般发起http请求时,需要确保关闭body

1
defer resp.Body.Close()

互斥锁忘记解锁

第一个协程获取 sync.Mutex 加锁了,但是他可能在处理业务逻辑,又或是忘记 Unlock 了。

因此导致后面的协程想加锁,却因锁未释放被阻塞了

1
2
3
4
5
6
7
8
func block5() {
var mutex sync.Mutex
for i := 0; i < 10; i++ {
go func() {
mutex.Lock()
}()
}
}

sync.WaitGroup使用不当

由于 wg.Add 的数量与 wg.Done 数量并不匹配,因此在调用 wg.Wait 方法后一直阻塞等待

1
2
3
4
5
6
7
8
9
10
func block6() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
go func() {
wg.Add(2)
wg.Done()
wg.Wait()
}()
}
}

如何排查

单个函数:调用 runtime.NumGoroutine 方法来打印 执行代码前后Goroutine 的运行数量,进行前后比较,就能知道有没有泄露了。

生产/测试环境:使用PProf实时监测Goroutine的数量

Go 如何查看正在执行的goroutine数量?

程序中引入pprof package

在程序中引入pprof package:

1
import _ "net/http/pprof"

程序中开启HTTP监听服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"net/http"
_ "net/http/pprof"
)

func main() {

for i := 0; i < 100; i++ {
go func() {
select {}
}()
}

go func() {
http.ListenAndServe("localhost:6060", nil)
}()

select {}
}

分析goroutine文件

在命令行下执行:

1
go tool pprof -http=:1248 http://127.0.0.1:6060/debug/pprof/goroutine

会自动打开浏览器页面如下图所示

img

在图中可以清晰的看到goroutine的数量以及调用关系,可以看到有103个goroutine

Go 如何控制并发的goroutine数量?

为什么要控制goroutine并发的数量?

在开发过程中,如果不对goroutine加以控制而进行滥用的话,可能会导致服务整体崩溃。比如耗尽系统资源导致程序崩溃,或者CPU使用率过高导致系统忙不过来。

用什么方法控制goroutine并发的数量?

有缓冲channel

利用缓冲满时发送阻塞的特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"runtime"
"time"
)

var wg = sync.WaitGroup{}

func main() {
// 模拟用户请求数量
requestCount := 10
fmt.Println("goroutine_num", runtime.NumGoroutine())
// 管道长度即最大并发数
ch := make(chan bool, 3)
for i := 0; i < requestCount; i++ {
wg.Add(1)
ch <- true
go Read(ch, i)
}

wg.Wait()
}

func Read(ch chan bool, i int) {
fmt.Printf("goroutine_num: %d, go func: %d\n", runtime.NumGoroutine(), i)
<-ch
wg.Done()
}

输出结果:默认最多不超过3(4-1)个goroutine并发执行

1
2
3
4
5
6
7
8
9
10
11
goroutine_num 1
goroutine_num: 4, go func: 1
goroutine_num: 4, go func: 3
goroutine_num: 4, go func: 2
goroutine_num: 4, go func: 0
goroutine_num: 4, go func: 4
goroutine_num: 4, go func: 5
goroutine_num: 4, go func: 6
goroutine_num: 4, go func: 8
goroutine_num: 4, go func: 9
goroutine_num: 4, go func: 7

无缓冲channel

任务发送和执行分离,指定消费者并发协程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"runtime"
"sync"
)

var wg = sync.WaitGroup{}

func main() {
// 模拟用户请求数量
requestCount := 10
fmt.Println("goroutine_num", runtime.NumGoroutine())
ch := make(chan bool)
for i := 0; i < 3; i++ {
go Read(ch, i)
}

for i := 0; i < requestCount; i++ {
wg.Add(1)
ch <- true
}

wg.Wait()
}

func Read(ch chan bool, i int) {
for _ = range ch {
fmt.Printf("goroutine_num: %d, go func: %d\n", runtime.NumGoroutine(), i)
wg.Done()
}
}

调度模型

Go 线程实现模型?

Go实现的是两级线程模型(M:N),准确的说是GMP模型,是对两级线程模型的改进实现,使它能够更加灵活地进行线程之间的调度。

背景

含义 缺点
单进程时代 每个程序就是一个进程,直到一个程序运行完,才能进行下一个进程 1. 无法并发,只能串行 2. 进程阻塞所带来的 CPU 时间浪费
多进程/线程时代 一个线程阻塞, cpu 可以立刻切换到其他线程中去执行 1. 进程/线程占用内存高 2. 进程/线程上下文切换成本高
协程时代 协程(用户态线程)绑定线程(内核态线程),cpu调度线程执行 1. 实现起来较复杂,协程和线程的绑定依赖调度器算法

线程 -> CPU 由 操作系统 调度,协程 -> 线程 由Go调度器来调度,协程与线程的映射关系有三种线程模型

三种线程模型

线程实现模型主要分为:内核级线程模型用户级线程模型两级线程模型,他们的区别在于用户线程与内核线程之间的对应关系。

内核级线程模型(1:1)

1个用户线程对应1个内核线程,这种最容易实现,协程的调度都由 CPU 完成了

img

优点:

  • 实现起来最简单
  • 能够利用多核
  • 如果进程中的一个线程被阻塞,不会阻塞其他线程,是能够切换同一进程内的其他线程继续执行

缺点:

  • 上下文切换成本高,创建、删除和切换都由 CPU 完成

用户级线程模型(N:1)

1个进程中的所有线程对应1个内核线程

img

优点:

  • 上下文切换成本低,在用户态即可完成协程切换

缺点:

  • 无法利用多核
  • 一旦协程阻塞,造成线程阻塞,本线程的其它协程无法执行

两级线程模型(M:N)

M个线程对应N个内核线程

img

优点:

  • 能够利用多核
  • 上下文切换成本低
  • 如果进程中的一个线程被阻塞,不会阻塞其他线程,是能够切换同一进程内的其他线程继续执行

缺点:

  • 实现起来最复杂

Go GMP和GM模型?

什么才是一个好的调度器?

能在适当的时机将合适的协程分配到合适的位置,保证公平和效率。

Go采用了GMP模型(对两级线程模型的改进实现),使它能够更加灵活地进行线程之间的调度。

GMP模型

GMP是Go运行时调度层面的实现,包含4个重要结构,分别是G、M、P、Sched

img

G(Goroutine):代表Go 协程Goroutine,存储了 Goroutine 的执行栈信息、Goroutine 状态以及 Goroutine 的任务函数等。G的数量无限制,理论上只受内存的影响,创建一个 G 的初始栈大小为2-4K,配置一般的机器也能简简单单开启数十万个 Goroutine ,而且Go语言在 G 退出的时候还会把 G 清理之后放到 P 本地或者全局的闲置列表 gFree 中以便复用。

M(Machine): Go 对操作系统线程(OS thread)的封装,可以看作操作系统内核线程,想要在 CPU 上执行代码必须有线程,通过系统调用 clone 创建。M在绑定有效的 P 后,进入一个调度循环,而调度循环的机制大致是从 P 的本地运行队列以及全局队列中获取 G,切换到 G 的执行栈上并执行 G 的函数,调用 goexit 做清理工作并回到 M,如此反复。M 并不保留 G 状态,这是 G 可以跨 M 调度的基础。M的数量有限制,默认数量限制是 10000,可以通过 debug.SetMaxThreads() 方法进行设置,如果有M空闲,那么就会回收或者睡眠。

P(Processor):虚拟处理器,M执行G所需要的资源和上下文,只有将 P 和 M 绑定,才能让 P 的 runq 中的 G 真正运行起来。P 的数量决定了系统内最大可并行的 G 的数量,P的数量受本机的CPU核数影响,可通过环境变量$GOMAXPROCS或在runtime.GOMAXPROCS()来设置,默认为CPU核心数。

Sched:调度器结构,它维护有存储M和G的全局队列,以及调度器的一些状态信息

G M P
数量限制 无限制,受机器内存影响 有限制,默认最多10000 有限制,最多GOMAXPROCS个
创建时机 go func 当没有足够的M来关联P并运行其中的可运行的G时会请求创建新的M 在确定了P的最大数量n后,运行时系统会根据这个数量创建个P

核心数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//src/runtime/runtime2.go
type g struct {
goid int64 // 唯一的goroutine的ID
sched gobuf // goroutine切换时,用于保存g的上下文
stack stack // 栈
gopc // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
...
}

type p struct {
lock mutex
id int32
status uint32 // one of pidle/prunning/...

// Queue of runnable goroutines. Accessed without lock.
runqhead uint32 // 本地队列队头
runqtail uint32 // 本地队列队尾
runq [256]guintptr // 本地队列,大小256的数组,数组往往会被都读入到缓存中,对缓存友好,效率较高
runnext guintptr // 下一个优先执行的goroutine(一定是最后生产出来的),为了实现局部性原理,runnext中的G永远会被最先调度执行
...
}

type m struct {
g0 *g
// 每个M都有一个自己的G0,不指向任何可执行的函数,在调度或系统调用时,M会切换到G0,使用G0的栈空间来调度
curg *g
// 当前正在执行的G
...
}

type schedt struct {
...
runq gQueue // 全局队列,链表(长度无限制)
runqsize int32 // 全局队列长度
...
}

GMP模型的实现算是Go调度器的一大进步,但调度器仍然有一个令人头疼的问题,那就是不支持抢占式调度,这导致一旦某个 G 中出现死循环的代码逻辑,那么 G 将永久占用分配给它的 P 和 M,而位于同一个 P 中的其他 G 将得不到调度,出现“饿死”的情况。

当只有一个 P(GOMAXPROCS=1)时,整个 Go 程序中的其他 G 都将“饿死”。于是在 Go 1.2 版本中实现了基于协作的“抢占式”调度,在Go 1.14 版本中实现了基于信号的“抢占式”调度。

GM模型

Go早期是GM模型,没有P组件

img

**GM调度存在的问题: **

  1. 全局队列的锁竞争,当 M 从全局队列中添加或者获取 G 的时候,都需要获取队列锁,导致激烈的锁竞争
  2. M 转移 G 增加额外开销,当 M1 在执行 G1 的时候, M1 创建了 G2,为了继续执行 G1,需要把 G2 保存到全局队列中,无法保证G2是被M1处理。因为 M1 原本就保存了 G2 的信息,所以 G2 最好是在 M1 上执行,这样的话也不需要转移G到全局队列和线程上下文切换
  3. 线程使用效率不能最大化,没有work-stealinghand-off 机制

计算机科学领域的任何问题都可以通过增加一个间接的中间层来解决,为了解决这一的问题 go 从 1.1 版本引入P,在运行时系统的时候加入 P 对象,让 P 去管理这个 G 对象,M 想要运行 G,必须绑定 P,才能运行 P 所管理 的 G

Go 调度原理?

goroutine调度的本质就是将 Goroutine (G)按照一定算法放到CPU上去执行。

CPU感知不到Goroutine,只知道内核线程,所以需要Go调度器将协程调度到内核线程上面去,然后操作系统调度器将内核线程放到CPU上去执行

M是对内核级线程的封装,所以Go调度器的工作就是将G分配到M

Go 调度器的实现不是一蹴而就的,它的调度模型与算法也是几经演化,从最初的 GM 模型、到 GMP模型,从不支持抢占,到支持协作式抢占,再到支持基于信号的异步抢占,经历了不断地优化与打磨。

设计思想

  • 线程复用(work stealing 机制hand off 机制
  • 利用并行(利用多核CPU)
  • 抢占调度(解决公平性问题)

调度对象

Go 调度器

Go 调度器是属于Go runtime中的一部分,Go runtime负责实现Go的并发调度垃圾回收内存堆栈管理等关键功能

被调度对象

G的来源

  • P的runnext(只有1个G,局部性原理,永远会被最先调度执行)
  • P的本地队列(数组,最多256个G)
  • 全局G队列(链表,无限制)
  • 网络轮询器network poller(存放网络调用被阻塞的G)

P的来源

  • 全局P队列(数组,GOMAXPROCS个P)

M的来源

  • 休眠线程队列(未绑定P,长时间休眠会等待GC回收销毁)
  • 运行线程(绑定P,指向P中的G)
  • 自旋线程(绑定P,指向M的G0)

其中运行线程数 + 自旋线程数 <= P的数量(GOMAXPROCS),M个数 >= P个数

调度流程

协程的调度采用了生产者-消费者模型,实现了用户任务与调度器的解耦

img

img

生产端我们开启的每个协程都是一个计算任务,这些任务会被提交给 go 的 runtime。如果计算任务非常多,有成千上万个,那么这些任务是不可能同时被立刻执行的,所以这个计算任务一定会被先暂存起来,一般的做法是放到内存的队列中等待被执行。

G的生命周期:G 从创建、保存、被获取、调度和执行、阻塞、销毁,步骤如下:

步骤 1:创建 G,关键字 go func() 创建 G 步骤 2:保存 G,创建的 G 优先保存到本地队列 P,如果 P 满了,则会平衡部分P到全局队列中

步骤3唤醒或者新建M执行任务,进入调度循环(步骤4,5,6)

步骤 4:M 获取 G,M首先从P的本地队列获取 G,如果 P为空,则从全局队列获取 G,如果全局队列也为空,则从另一个本地队列偷取一半数量的 G(负载均衡),这种从其它P偷的方式称之为 work stealing

步骤 5:M 调度和执行 G,M调用 G.func() 函数执行 G

  • 如果 M在执行 G 的过程发生系统调用阻塞(同步),会阻塞G和M(操作系统限制),此时P会和当前M解绑,并寻找新的M,如果没有空闲的M就会新建一个M ,接管正在阻塞G所属的P,接着继续执行 P中其余的G,这种阻塞后释放P的方式称之为hand off。当系统调用结束后,这个G会尝试获取一个空闲的P执行,优先获取之前绑定的P,并放入到这个P的本地队列,如果获取不到P,那么这个线程M变成休眠状态,加入到空闲线程中,然后这个G会被放入到全局队列中。
  • 如果M在执行G的过程发生网络IO等操作阻塞时(异步),阻塞G,不会阻塞M。M会寻找P中其它可执行的G继续执行,G会被网络轮询器network poller 接手,当阻塞的G恢复后,G1从network poller 被移回到P的 LRQ 中,重新进入可执行状态。异步情况下,通过调度,Go scheduler 成功地将 I/O 的任务转变成了 CPU 任务,或者说将内核级别的线程切换转变成了用户级别的 goroutine 切换,大大提高了效率。

步骤6:M执行完G后清理现场,重新进入调度循环(将M上运⾏的goroutine切换为G0,G0负责调度时协程的切换)

其中步骤2中保存 G的详细流程如下:

  • 执行 go func 的时候,主线程 M0 会调用 newproc()生成一个 G 结构体,这里会先选定当前 M0 上的 P 结构
  • 每个协程 G 都会被尝试先放到 P 中的 runnext,若 runnext 为空则放到 runnext 中,生产结束
  • 若 runnext 满,则将原来 runnext 中的 G 踢到本地队列中,将当前 G 放到 runnext 中,生产结束
  • 若本地队列也满了,则将本地队列中的 G 拿出一半,放到全局队列中,生产结束。

img

调度时机

什么时候进行调度(执行/切换)?

在以下情形下,会切换正在执行的goroutine

  • 抢占式调度
    • sysmon 检测到协程运行过久(比如sleep,死循环)
      • 切换到g0,进入调度循环
  • 主动调度
    • 新起一个协程和协程执行完毕
      • 触发调度循环
    • 主动调用runtime.Gosched()
      • 切换到g0,进入调度循环
    • 垃圾回收之后
      • stw之后,会重新选择g开始执行
  • 被动调度
    • 系统调用(比如文件IO)阻塞(同步)
      • 阻塞G和M,P与M分离,将P交给其它M绑定,其它M执行P的剩余G
    • 网络IO调用阻塞(异步)
      • 阻塞G,G移动到NetPoller,M执行P的剩余G
    • atomic/mutex/channel等阻塞(异步)
      • 阻塞G,G移动到channel的等待队列中,M执行P的剩余G

调度策略

使用什么策略来挑选下一个goroutine执行?

由于 P 中的 G 分布在 runnext、本地队列、全局队列、网络轮询器中,则需要挨个判断是否有可执行的 G,大体逻辑如下:

  • 每执行61次调度循环,从全局队列获取G,若有则直接返回
  • 从P 上的 runnext 看一下是否有 G,若有则直接返回
  • 从P 上的 本地队列 看一下是否有 G,若有则直接返回
  • 上面都没查找到时,则去全局队列、网络轮询器查找或者从其他 P 中窃取,一直阻塞直到获取到一个可用的 G 为止

源码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func schedule() {
_g_ := getg()
var gp *g
var inheritTime bool
...
if gp == nil {
// 每执行61次调度循环会看一下全局队列。为了保证公平,避免全局队列一直无法得到执行的情况,当全局运行队列中有待执行的G时,通过schedtick保证有一定几率会从全局的运行队列中查找对应的Goroutine;
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 先尝试从P的runnext和本地队列查找G
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
// 仍找不到,去全局队列中查找。还找不到,要去网络轮询器中查找是否有G等待运行;仍找不到,则尝试从其他P中窃取G来执行。
gp, inheritTime = findrunnable() // blocks until work is available
// 这个函数是阻塞的,执行到这里一定会获取到一个可执行的G
}
...
// 调用execute,继续调度循环
execute(gp, inheritTime)
}

从全局队列查找时,如果要所有 P 平分全局队列中的 G,每个 P 要分得多少个,这里假设会分得 n 个。然后把这 n 个 G,转移到当前 G 所在 P 的本地队列中去。但是最多不能超过 P 本地队列长度的一半(即 128)。这样做的目的是,如果下次调度循环到来的时候,就不必去加锁到全局队列中在获取一次 G 了,性能得到了很好的保障。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func globrunqget(_p_ *p, max int32) *g {
...
// gomaxprocs = p的数量
// sched.runqsize是全局队列长度
// 这里n = 全局队列的G平分到每个P本地队列上的数量 + 1
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
// 平分后的数量n不能超过本地队列长度的一半,也就是128
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}

// 执行将G从全局队列中取n个分到当前P本地队列的操作
sched.runqsize -= n

gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}

从其它P查找时,会偷一半的G过来放到当前P的本地队列

Go work stealing 机制?

概念

当线程M⽆可运⾏的G时,尝试从其他M绑定的P偷取G,减少空转,提高了线程利用率(避免闲着不干活)。

当从本线程绑定 P 本地 队列、全局G队列、netpoller都找不到可执行的 g,会从别的 P 里窃取G并放到当前P上面。

netpoller 中拿到的G是_Gwaiting状态( 存放的是因为网络IO被阻塞的G),从其它地方拿到的G是_Grunnable状态

从全局队列取的G数量:N = min(len(GRQ)/GOMAXPROCS + 1, len(GRQ/2)) (根据GOMAXPROCS负载均衡)

从其它P本地队列窃取的G数量:N = len(LRQ)/2(平分)

img

窃取流程

源码见runtime/proc.go stealWork函数,窃取流程如下,如果经过多次努力一直找不到需要运行的goroutine则调用stopm进入睡眠状态,等待被其它工作线程唤醒。

  1. 选择要窃取的P
  2. 从P中偷走一半G
选择要窃取的P

窃取的实质就是遍历allp中的所有p,查看其运行队列是否有goroutine,如果有,则取其一半到当前工作线程的运行队列

为了保证公平性,遍历allp时并不是固定的从allp[0]即第一个p开始,而是从随机位置上的p开始,而且遍历的顺序也随机化了,并不是现在访问了第i个p下一次就访问第i+1个p,而是使用了一种伪随机的方式遍历allp中的每个p,防止每次遍历时使用同样的顺序访问allp中的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
offset := uint32(random()) % nprocs
coprime := 随机选取一个小于nprocs且与nprocs互质的数
const stealTries = 4 // 最多重试4次
for i := 0; i < stealTries; i++ {
for i := 0; i < nprocs; i++ {
p := allp[offset]
从p的运行队列偷取goroutine
if 偷取成功 {
break
}
offset += coprime
offset = offset % nprocs
}
}

可以看到只要随机数不一样,偷取p的顺序也不一样,但可以保证经过nprocs次循环,每个p都会被访问到。

从P中偷走一半G

源码见runtime/proc.go runqsteal函数:

挑选出盗取的对象p之后,则调用runqsteal盗取p的运行队列中的goroutine,runqsteal函数再调用runqgrap从p的本地队列尾部批量偷走一半的g

为啥是偷一半的g,可以理解为负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h //计算队列中有多少个goroutine
n = n - n/2 //取队列中goroutine个数的一半
if n == 0 {
......
return ......
}
return n
}
}

Go hand off 机制?

概念

也称为P分离机制,当本线程 M 因为 G 进行的系统调用阻塞时,线程释放绑定的 P,把 P 转移给其他空闲的 M 执行,也提高了线程利用率(避免站着茅坑不拉shi)。

分离流程

当前线程M阻塞时,释放P,给其它空闲的M处理

img

Go 抢占式调度?

在1.2版本之前,Go的调度器仍然不支持抢占式调度,程序只能依靠Goroutine主动让出CPU资源才能触发调度,这会引发一些问题,比如:

  • 某些 Goroutine 可以长时间占用线程,造成其它 Goroutine 的饥饿
  • 垃圾回收器是需要stop the world的,如果垃圾回收器想要运行了,那么它必须先通知其它的goroutine停下来,这会造成较长时间的等待时间

为解决这个问题:

  • Go 1.2 中实现了基于协作的“抢占式”调度
  • Go 1.14 中实现了基于信号的“抢占式”调度

基于协作的抢占式调度

协作式:大家都按事先定义好的规则来,比如:一个goroutine执行完后,退出,让出p,然后下一个goroutine被调度到p上运行。这样做的缺点就在于 是否让出p的决定权在groutine自身。一旦某个g不主动让出p或执行时间较长,那么后面的goroutine只能等着,没有方法让前者让出p,导致延迟甚至饿死。

非协作式: 就是由runtime来决定一个goroutine运行多长时间,如果你不主动让出,对不起,我有手段可以抢占你,把你踢出去,让后面的goroutine进来运行。

基于协作的抢占式调度流程:

  • 编译器会在调用函数前插入 runtime.morestack,让运行时有机会在这段代码中检查是否需要执行抢占调度
  • Go语言运行时会在垃圾回收暂停程序、系统监控发现 Goroutine 运行超过 10ms,那么会在这个协程设置一个抢占标记
  • 当发生函数调用时,可能会执行编译器插入的 runtime.morestack,它调用的 runtime.newstack会检查抢占标记,如果有抢占标记就会触发抢占让出cpu,切到调度主协程里

这种解决方案只能说局部解决了“饿死”问题,只在有函数调用的地方才能插入“抢占”代码(埋点),对于没有函数调用而是纯算法循环计算的 G,Go 调度器依然无法抢占。

比如,死循环等并没有给编译器插入抢占代码的机会,以下程序在 go 1.14 之前的 go版本中,运行后会一直卡住,而不会打印 I got scheduled!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"runtime"
"time"
)

func main() {
runtime.GOMAXPROCS(1)
go func() {
for {
}
}()

time.Sleep(time.Second)
fmt.Println("I got scheduled!")
}

为了解决这些问题,Go 在 1.14 版本中增加了对非协作的抢占式调度的支持,这种抢占式调度是基于系统信号的,也就是通过向线程发送信号的方式来抢占正在运行的 Goroutine

基于信号的抢占式调度

真正的抢占式调度是基于信号完成的,所以也称为“异步抢占”。不管协程有没有意愿主动让出 cpu 运行权,只要某个协程执行时间过长,就会发送信号强行夺取 cpu 运行权。

  • M 注册一个 SIGURG 信号的处理函数:sighandler
  • sysmon启动后会间隔性的进行监控,最长间隔10ms,最短间隔20us。如果发现某协程独占P超过10ms,会给M发送抢占信号
  • M 收到信号后,内核执行 sighandler 函数把当前协程的状态从_Grunning正在执行改成 _Grunnable可执行,把抢占的协程放到全局队列里,M继续寻找其他 goroutine 来运行
  • 被抢占的 G 再次调度过来执行时,会继续原来的执行流

抢占分为_Prunning_Psyscall_Psyscall抢占通常是由于阻塞性系统调用引起的,比如磁盘io、cgo。_Prunning抢占通常是由于一些类似死循环的计算逻辑引起的。

Go 如何查看运行时调度信息?

有 2 种方式可以查看一个程序的调度GMP信息,分别是go tool trace和GODEBUG

trace.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"os"
"runtime/trace"
"time"
)

func main() {

//创建trace文件
f, err := os.Create("trace.out")
if err != nil {
panic(err)
}

defer f.Close()

//启动trace goroutine
err = trace.Start(f)
if err != nil {
panic(err)
}
defer trace.Stop()

//main
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
fmt.Println("Hello World")
}
}

go tool trace

启动可视化界面:

1
2
3
4
5
go run trace.go
go tool trace trace.out
2022/04/22 10:44:11 Parsing trace...
2022/04/22 10:44:11 Splitting trace...
2022/04/22 10:44:11 Opening browser. Trace viewer is listening on http://127.0.0.1:35488

1.打开 http://127.0.0.1:35488 查看可视化界面:

img

点击 view trace 能够看见可视化的调度流程:

img

img

一共有2个G在程序中,一个是特殊的G0,是每个M必须有的一个初始化的G,另外一个是G1 main goroutine (执行 main 函数的协程),在一段时间内处于可运行和运行的状态。

2. 点击 Threads 那一行可视化的数据条,我们会看到M详细的信息

img

一共有2个 M 在程序中,一个是特殊的 M0,用于初始化使用,另外一个是用于执行G1的M1

3. 点击 Proc 那一行可视化的数据条,我们会看到P上正在运行goroutine详细的信息

一共有3个 P 在程序中,分别是P0、P1、P2

img

点击具体的 Goroutine 行为后可以看到其相关联的详细信息:

1
2
3
4
5
6
7
8
9
10
Start:开始时间
Wall Duration:持续时间
Self Time:执行时间
Start Stack Trace:开始时的堆栈信息
End Stack Trace:结束时的堆栈信息
Incoming flow:输入流
Outgoing flow:输出流
Preceding events:之前的事件
Following events:之后的事件
All connected:所有连接的事件

GODEBUG

GODEBUG 变量可以控制运行时内的调试变量。查看调度器信息,将会使用如下两个参数:

  • schedtrace:设置 schedtrace=X 参数可以使运行时在每 X 毫秒发出一行调度器的摘要信息到标准 err 输出中。
  • scheddetail:设置 schedtrace=Xscheddetail=1 可以使运行时在每 X 毫秒发出一次详细的多行信息,信息内容主要包括调度程序、处理器、OS 线程 和 Goroutine 的状态。

查看基本信息

1
2
go build trace.go
GODEBUG=schedtrace=1000 ./trace
1
2
3
4
5
6
7
8
9
10
11
SCHED 0ms: gomaxprocs=8 idleprocs=6 threads=4 spinningthreads=1 idlethreads=0 runqueue=0 [1 0 0 0 0 0 0 0]
Hello World
SCHED 1010ms: gomaxprocs=8 idleprocs=8 threads=4 spinningthreads=0 idlethreads=2 runqueue=0 [0 0 0 0 0 0 0 0]
Hello World
SCHED 2014ms: gomaxprocs=8 idleprocs=8 threads=4 spinningthreads=0 idlethreads=2 runqueue=0 [0 0 0 0 0 0 0 0]
Hello World
SCHED 3024ms: gomaxprocs=8 idleprocs=8 threads=4 spinningthreads=0 idlethreads=2 runqueue=0 [0 0 0 0 0 0 0 0]
Hello World
SCHED 4027ms: gomaxprocs=8 idleprocs=8 threads=4 spinningthreads=0 idlethreads=2 runqueue=0 [0 0 0 0 0 0 0 0]
Hello World
SCHED 5029ms: gomaxprocs=8 idleprocs=7 threads=4 spinningthreads=0 idlethreads=2 runqueue=0 [0 0 0 0 0 0 0 0]

sched:每一行都代表调度器的调试信息,后面提示的毫秒数表示启动到现在的运行时间,输出的时间间隔受 schedtrace 的值影响。

gomaxprocs:当前的 CPU 核心数(GOMAXPROCS 的当前值)。

idleprocs:空闲的处理器数量,后面的数字表示当前的空闲数量。

threads:OS 线程数量,后面的数字表示当前正在运行的线程数量。

spinningthreads:自旋状态的 OS 线程数量。

idlethreads:空闲的线程数量。

runqueue:全局队列中中的 Goroutine 数量,而后面的[0 0 0 0 0 0 0 0] 则分别代表这 8 个 P 的本地队列正在运行的 Goroutine 数量。

查看详细信息

1
2
go build trace.go
GODEBUG=scheddetail=1,schedtrace=1000 ./trace
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SCHED 0ms: gomaxprocs=8 idleprocs=6 threads=4 spinningthreads=1 idlethreads=0 runqueue=0 gcwaiting=0 nmidlelocked=0 stopwait=0 sysmonwait=0
P0: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=1 gfreecnt=0 timerslen=0
P1: status=1 schedtick=0 syscalltick=0 m=2 runqsize=0 gfreecnt=0 timerslen=0
P2: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 timerslen=0
P3: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 timerslen=0
P4: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 timerslen=0
P5: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 timerslen=0
P6: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 timerslen=0
P7: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 timerslen=0
M3: p=0 curg=-1 mallocing=0 throwing=0 preemptoff= locks=1 dying=0 spinning=false blocked=false lockedg=-1
M2: p=1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=2 dying=0 spinning=false blocked=false lockedg=-1
M1: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=2 dying=0 spinning=false blocked=false lockedg=-1
M0: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=1 dying=0 spinning=false blocked=false lockedg=1
G1: status=1(chan receive) m=-1 lockedm=0
G2: status=1() m=-1 lockedm=-1
G3: status=1() m=-1 lockedm=-1
G4: status=4(GC scavenge wait) m=-1 lockedm=-1

G

1
2
3
status:G 的运行状态。
m:隶属哪一个 M。
lockedm:是否有锁定 M。

G 的运行状态共涉及如下 9 种状态:

状态 含义
_Gidle 0 刚刚被分配,还没有进行初始化。
_Grunnable 1 已经在运行队列中,还没有执行用户代码。
_Grunning 2 不在运行队列里中,已经可以执行用户代码,此时已经分配了 M 和 P。
_Gsyscall 3 正在执行系统调用,此时分配了 M。
_Gwaiting 4 在运行时被阻止,没有执行用户代码,也不在运行队列中,此时它正在某处阻塞等待中。
_Gmoribund_unused 5 尚未使用,但是在 gdb 中进行了硬编码。
_Gdead 6 尚未使用,这个状态可能是刚退出或是刚被初始化,此时它并没有执行用户代码,有可能有也有可能没有分配堆栈。
_Genqueue_unused 7 尚未使用。
_Gcopystack 8 正在复制堆栈,并没有执行用户代码,也不在运行队列中。

M

1
2
3
4
5
6
7
p:隶属哪一个 P。
curg:当前正在使用哪个 G。
runqsize:运行队列中的 G 数量。
gfreecnt:可用的G(状态为 Gdead)。
mallocing:是否正在分配内存。
throwing:是否抛出异常。
preemptoff:不等于空字符串的话,保持 curg 在这个 m 上运行。

P

1
2
3
4
5
6
status:P 的运行状态。
schedtick:P 的调度次数。
syscalltick:P 的系统调用次数。
m:隶属哪一个 M。
runqsize:运行队列中的 G 数量。
gfreecnt:可用的G(状态为 Gdead)
状态 含义
_Pidle 0 刚刚被分配,还没有进行进行初始化。
_Prunning 1 当 M 与 P 绑定调用 acquirep 时,P 的状态会改变为 _Prunning。
_Psyscall 2 正在执行系统调用。
_Pgcstop 3 暂停运行,此时系统正在进行 GC,直至 GC 结束后才会转变到下一个状态阶段。
_Pdead 4 废弃,不再使用。

内存管理

Go 内存分配机制?

Go语言内置运行时(就是runtime),抛弃了传统的内存分配方式,改为自主管理。这样可以自主地实现更好的内存使用模式,比如内存池、预分配等等。这样,不会每次内存分配都需要进行系统调用。

设计思想

  • 内存分配算法采用Google的TCMalloc算法,每个线程都会自行维护一个独立的内存池,进行内存分配时优先从该内存池中分配,当内存池不足时才会向加锁向全局内存池申请,减少系统调用并且避免不同线程对全局内存池的锁竞争
  • 把内存切分的非常的细小,分为多级管理,以降低锁的粒度
  • 回收对象内存时,并没有将其真正释放掉,只是放回预先分配的大块内存中,以便复用。只有内存闲置过多的时候,才会尝试归还部分内存给操作系统,降低整体开销

分配组件

Go的内存管理组件主要有:mspanmcachemcentralmheap

img

内存管理单元:mspan

mspan是 内存管理的基本单元,该结构体中包含 nextprev 两个字段,它们分别指向了前一个和后一个mspan,每个mspan 都管理 npages 个大小为 8KB 的页,一个span 是由多个page组成的,这里的页不是操作系统中的内存页,它们是操作系统内存页的整数倍。

page是内存存储的基本单元,“对象”放到page

1
2
3
4
5
6
7
8
9
10
type mspan struct {
next *mspan // 后指针
prev *mspan // 前指针
startAddr uintptr // 管理页的起始地址,指向page
npages uintptr // 页数
spanclass spanClass // 规格
...
}

type spanClass uint8

Go有68种不同大小的spanClass,用于小对象的分配

1
2
const _NumSizeClasses = 68
var class_to_size = [_NumSizeClasses]uint16{0, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 288, 320, 352, 384, 416, 448, 480, 512, 576, 640, 704, 768, 896, 1024, 1152, 1280, 1408, 1536,1792, 2048, 2304, 2688, 3072, 3200, 3456, 4096, 4864, 5376, 6144, 6528, 6784, 6912, 8192, 9472, 9728, 10240, 10880, 12288, 13568, 14336, 16384, 18432, 19072, 20480, 21760, 24576, 27264, 28672, 32768}

如果按照序号为1的spanClass(对象规格为8B)分配,每个span占用堆的字节数:8k,mspan可以保存1024个对象

如果按照序号为2的spanClass(对象规格为16B)分配,每个span占用堆的字节数:8k,mspan可以保存512个对象

如果按照序号为67的spanClass(对象规格为32K)分配,每个span占用堆的字节数:32k,mspan可以保存1个对象

img

字段含义:

  • class: class ID,每个span结构中都有一个class ID, 表示该span可处理的对象类型
  • bytes/obj:该class代表对象的字节数
  • bytes/span:每个span占用堆的字节数,也即页数*页大小
  • objects: 每个span可分配的对象个数,也即(bytes/spans)/(bytes/obj)
  • waste bytes: 每个span产生的内存碎片,也即(bytes/spans)%(bytes/obj)

大于32k的对象出现时,会直接从heap分配一个特殊的span,这个特殊的span的类型(class)是0, 只包含了一个大对象

线程缓存:mcache

mcache管理线程在本地缓存的mspan,每个goroutine绑定的P都有一个mcache字段

1
2
3
4
5
6
type mcache struct {
alloc [numSpanClasses]*mspan
}

_NumSizeClasses = 68
numSpanClasses = _NumSizeClasses << 1

mcacheSpan Classes作为索引管理多个用于分配的mspan,它包含所有规格的mspan。它是_NumSizeClasses的2倍,也就是68*2=136,其中*2是将spanClass分成了有指针和没有指针两种,方便与垃圾回收。对于每种规格,有2个mspan,一个mspan不包含指针,另一个mspan则包含指针。对于无指针对象的mspan在进行垃圾回收的时候无需进一步扫描它是否引用了其他活跃的对象。

mcache在初始化的时候是没有任何mspan资源的,在使用过程中会动态地从mcentral申请,之后会缓存下来。当对象小于等于32KB大小时,使用mcache的相应规格的mspan进行分配。

中心缓存:mcentral

mcentral管理全局的mspan供所有线程使用,全局mheap变量包含central字段,每个 mcentral 结构都维护在mheap结构内

1
2
3
4
5
6
type mcentral struct {
spanclass spanClass // 指当前规格大小

partial [2]spanSet // 有空闲object的mspan列表
full [2]spanSet // 没有空闲object的mspan列表
}

每个mcentral管理一种spanClass的mspan,并将有空闲空间和没有空闲空间的mspan分开管理。partial和 full的数据类型为spanSet,表示 mspans集,可以通过pop、push来获得mspans

1
2
3
4
5
6
7
8
type spanSet struct {
spineLock mutex
spine unsafe.Pointer // 指向[]span的指针
spineLen uintptr // Spine array length, accessed atomically
spineCap uintptr // Spine array cap, accessed under lock

index headTailIndex // 前32位是头指针,后32位是尾指针
}

简单说下mcachemcentral获取和归还mspan的流程:

  • 获取; 加锁,从partial链表找到一个可用的mspan;并将其从partial链表删除;将取出的mspan加入到full链表;将mspan返回给工作线程,解锁。
  • 归还; 加锁,将mspanfull链表删除;将mspan加入到partial链表,解锁。
页堆:mheap

mheap管理Go的所有动态分配内存,可以认为是Go程序持有的整个堆空间,全局唯一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var mheap_ mheap
type mheap struct {
lock mutex // 全局锁
pages pageAlloc // 页面分配的数据结构
allspans []*mspan // 所有通过 mheap_ 申请的mspans
// 堆
arenas [1 << arenaL1Bits]*[1 << arenaL2Bits]*heapArena

// 所有中心缓存mcentral
central [numSpanClasses]struct {
mcentral mcentral
pad [cpu.CacheLinePadSize - unsafe.Sizeof(mcentral{})%cpu.CacheLinePadSize]byte
}
...
}

所有mcentral的集合则是存放于mheap中的。mheap里的arena 区域是堆内存的抽象,运行时会将 8KB 看做一页,这些内存页中存储了所有在堆上初始化的对象。运行时使用二维的 runtime.heapArena 数组管理所有的内存,每个 runtime.heapArena 都会管理 64MB 的内存。

当申请内存时,依次经过 mcachemcentral 都没有可用合适规格的大小内存,这时候会向 mheap 申请一块内存。然后按指定规格划分为一些列表,并将其添加到相同规格大小的 mcentral非空闲列表 后面

分配对象
  • 微对象 (0, 16B):先使用线程缓存上的微型分配器,再依次尝试线程缓存、中心缓存、堆 分配内存;
  • 小对象 [16B, 32KB]:依次尝试线程缓存、中心缓存、堆 分配内存;
  • 大对象 (32KB, +∞):直接尝试堆分配内存;
分配流程
  • 首先通过计算使用的大小规格
  • 然后使用mcache中对应大小规格的块分配。
  • 如果mcentral中没有可用的块,则向mheap申请,并根据算法找到最合适的mspan
  • 如果申请到的mspan 超出申请大小,将会根据需求进行切分,以返回用户所需的页数。剩余的页构成一个新的 mspan 放回 mheap 的空闲列表。
  • 如果 mheap 中没有可用 span,则向操作系统申请一系列新的页(最小 1MB)

img

Go 内存逃逸机制?

概念

在一段程序中,每一个函数都会有自己的内存区域存放自己的局部变量、返回地址等,这些内存会由编译器在栈中进行分配,每一个函数都会分配一个栈桢,在函数运行结束后进行销毁,但是有些变量我们想在函数运行结束后仍然使用它,那么就需要把这个变量在堆上分配,这种从”栈”上逃逸到”堆”上的现象就成为内存逃逸。

在栈上分配的地址,一般由系统申请和释放,不会有额外性能的开销,比如函数的入参、局部变量、返回值等。在堆上分配的内存,如果要回收掉,需要进行 GC,那么GC 一定会带来额外的性能开销。编程语言不断优化GC算法,主要目的都是为了减少 GC带来的额外性能开销,变量一旦逃逸会导致性能开销变大。

逃逸机制

编译器会根据变量是否被外部引用来决定是否逃逸:

  1. 如果函数外部没有引用,则优先放到栈中;
  2. 如果函数外部存在引用,则必定放到堆中;
  3. 如果栈上放不下,则必定放到堆上;

逃逸分析也就是由编译器决定哪些变量放在栈,哪些放在堆中,通过编译参数-gcflag=-m可以查看编译过程中的逃逸分析,发生逃逸的几种场景如下:

指针逃逸
1
2
3
4
5
6
7
8
9
10
package main

func escape1() *int {
var a int = 1
return &a
}

func main() {
escape1()
}

通过go build -gcflags=-m main.go查看逃逸情况:

1
./main.go:4:6: moved to heap: a

函数返回值为局部变量的指针,函数虽然退出了,但是因为指针的存在,指向的内存不能随着函数结束而回收,因此只能分配在堆上。

栈空间不足
1
2
3
4
5
6
7
8
9
10
11
12
package main

func escape2() {
s := make([]int, 0, 10000)
for index, _ := range s {
s[index] = index
}
}

func main() {
escape2()
}

通过go build -gcflags=-m main.go查看逃逸情况:

1
./main.go:4:11: make([]int, 10000, 10000) escapes to heap

当栈空间足够时,不会发生逃逸,但是当变量过大时,已经完全超过栈空间的大小时,将会发生逃逸到堆上分配内存。局部变量s占用内存过大,编译器会将其分配到堆上

变量大小不确定
1
2
3
4
5
6
7
8
9
10
11
12
13
package main

func escape3() {
number := 10
s := make([]int, number) // 编译期间无法确定slice的长度
for i := 0; i < len(s); i++ {
s[i] = i
}
}

func main() {
escape3()
}

编译期间无法确定slice的长度,这种情况为了保证内存的安全,编译器也会触发逃逸,在堆上进行分配内存。直接s := make([]int, 10)不会发生逃逸

动态类型

动态类型就是编译期间不确定参数的类型、参数的长度也不确定的情况下就会发生逃逸

空接口 interface{} 可以表示任意的类型,如果函数参数为 interface{},编译期间很难确定其参数的具体类型,也会发生逃逸。

1
2
3
4
5
6
7
8
9
10
11
package main

import "fmt"

func escape4() {
fmt.Println(1111)
}

func main() {
escape4()
}

通过go build -gcflags=-m main.go查看逃逸情况:

1
./main.go:6:14: 1111 escapes to heap

fmt.Println(a …interface{})函数参数为interface,编译器不确定参数的类型,会将变量分配到堆上

闭包引用对象
1
2
3
4
5
6
7
8
9
10
11
12
13
package main

func escape5() func() int {
var i int = 1
return func() int {
i++
return i
}
}

func main() {
escape5()
}

通过go build -gcflags=-m main.go查看逃逸情况:

1
./main.go:4:6: moved to heap: i

闭包函数中局部变量i在后续函数是继续使用的,编译器将其分配到堆上

总结
  1. 栈上分配内存比在堆中分配内存效率更高
  2. 栈上分配的内存不需要 GC 处理,而堆需要
  3. 逃逸分析目的是决定内分配地址是栈还是堆
  4. 逃逸分析在编译阶段完成

因为无论变量的大小,只要是指针变量都会在堆上分配,所以对于小变量我们还是使用传值效率(而不是传指针)更高一点。

Go 内存对齐机制?

什么是内存对齐

为了能让CPU可以更快的存取到各个字段,Go编译器会帮你把struct结构体做数据的对齐。所谓的数据对齐,是指内存地址是所存储数据大小(按字节为单位)的整数倍,以便CPU可以一次将该数据从内存中读取出来。 编译器通过在结构体的各个字段之间填充一些空白已达到对齐的目的。

对齐系数

不同硬件平台占用的大小和对齐值都可能是不一样的,每个特定平台上的编译器都有自己的默认”对齐系数”,32位系统对齐系数是4,64位系统对齐系数是8

不同类型的对齐系数也可能不一样,使用Go语言中的unsafe.Alignof函数可以返回相应类型的对齐系数,对齐系数都符合2^n这个规律,最大也不会超过8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
"unsafe"
)

func main() {
fmt.Printf("bool alignof is %d\n", unsafe.Alignof(bool(true)))
fmt.Printf("string alignof is %d\n", unsafe.Alignof(string("a")))
fmt.Printf("int alignof is %d\n", unsafe.Alignof(int(0)))
fmt.Printf("float alignof is %d\n", unsafe.Alignof(float64(0)))
fmt.Printf("int32 alignof is %d\n", unsafe.Alignof(int32(0)))
fmt.Printf("float32 alignof is %d\n", unsafe.Alignof(float32(0)))
}

可以查看到各种类型在Mac 64位上的对齐系数如下:

1
2
3
4
5
6
bool alignof is 1
string alignof is 8
int alignof is 8
int32 alignof is 4
float32 alignof is 4
float alignof is 8
优点
  1. 提高可移植性,有些CPU可以访问任意地址上的任意数据,而有些CPU只能在特定地址访问数据,因此不同硬件平台具有差异性,这样的代码就不具有移植性,如果在编译时,将分配的内存进行对齐,这就具有平台可以移植性了
  2. 提高内存的访问效率,32位CPU下一次可以从内存中读取32位(4个字节)的数据,64位CPU下一次可以从内存中读取64位(8个字节)的数据,这个长度也称为CPU的字长。CPU一次可以读取1个字长的数据到内存中,如果所需要读取的数据正好跨了1个字长,那就得花两个CPU周期的时间去读取了。因此在内存中存放数据时进行对齐,可以提高内存访问效率。
缺点
  1. 存在内存空间的浪费,实际上是空间换时间

结构体对齐

对齐原则:

  1. 结构体变量中成员的偏移量必须是成员大小的整数倍
  2. 整个结构体的地址必须是最大字节的整数倍(结构体的内存占用是1/4/8/16byte…)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"runtime"
"unsafe"
)

type T1 struct {
i16 int16 // 2 byte
bool bool // 1 byte
}

type T2 struct {
i8 int8 // 1 byte
i64 int64 // 8 byte
i32 int32 // 4 byte
}

type T3 struct {
i8 int8 // 1 byte
i32 int32 // 4 byte
i64 int64 // 8 byte
}

func main() {
fmt.Println(runtime.GOARCH) // amd64

t1 := T1{}
fmt.Println(unsafe.Sizeof(t1)) // 4 bytes

t2 := T2{}
fmt.Println(unsafe.Sizeof(t2)) // 24 bytes

t3 := T3{}
fmt.Println(unsafe.Sizeof(t3)) // 16 bytes
}

以T1结构体为例,实际存储数据的只有3字节,但实际用了4字节,浪费了1个字节:

i16并没有直接放在bool的后面,而是在bool中填充了一个空白后,放到了偏移量为2的位置上。如果i16从偏移量为1的位置开始占用2个字节,根据对齐原则2:构体变量中成员的偏移量必须是成员大小的整数倍,套用公式 1 % 2 = 1,就不满足对齐的要求,所以i16从偏移量为2的位置开始

img

以T2结构体为例,实际存储数据的只有13字节,但实际用了24字节,浪费了11个字节:

img

以T3结构体为例,实际存储数据的只有13字节,但实际用了16字节,浪费了3个字节:

img

Go GC实现原理?

什么是GC?

垃圾回收也称为GC(Garbage Collection),是一种自动内存管理机制

现代高级编程语言管理内存的方式分为两种:自动和手动,像C、C++ 等编程语言使用手动管理内存的方式,工程师编写代码过程中需要主动申请或者释放内存;而 PHP、Java 和 Go 等语言使用自动的内存管理系统,有内存分配器和垃圾收集器来代为分配和回收内存,其中垃圾收集器就是我们常说的GC。

在应用程序中会使用到两种内存,分别为堆(Heap)和栈(Stack),GC负责回收堆内存,而不负责回收栈中的内存:

栈是线程的专用内存,专门为了函数执行而准备的,存储着函数中的局部变量以及调用栈,函数执行完后,编译器可以将栈上分配的内存可以直接释放,不需要通过GC来回收。

堆是程序共享的内存,需要GC进行回收在堆上分配的内存。

垃圾回收器的执行过程被划分为两个半独立的组件:

  • 赋值器(Mutator):这一名称本质上是在指代用户态的代码。因为对垃圾回收器而言,用户态的代码仅仅只是在修改对象之间的引用关系,也就是在对象图(对象之间引用关系的一个有向图)上进行操作。
  • 回收器(Collector):负责执行垃圾回收的代码。

主流GC算法

目前比较常见的垃圾回收算法有三种:

  1. 引用计数:为每个对象维护一个引用计数,当引用该对象的对象销毁时,引用计数 -1,当对象引用计数为 0 时回收该对象。
    • 代表语言:PythonPHPSwift
    • 优点:对象回收快,不会出现内存耗尽或达到某个阈值时才回收。
    • 缺点:不能很好的处理循环引用,而实时维护引用计数也是有损耗的。
  2. 分代收集:按照对象生命周期长短划分不同的代空间,生命周期长的放入老年代,短的放入新生代,不同代有不同的回收算法和回收频率。
    • 代表语言:Java
    • 优点:回收性能好
    • 缺点:算法复杂
  3. 标记-清除:从根变量开始遍历所有引用的对象,标记引用的对象,没有被标记的进行回收。
    • 代表语言:Golang(三色标记法)
    • 优点:解决了引用计数的缺点。
    • 缺点:需要 STW,暂时停掉程序运行。

img

Go GC算法

三色标记法

此算法是在Go 1.5版本开始使用,Go 语言采用的是标记清除算法,并在此基础上使用了三色标记法和混合写屏障技术,GC过程和其他用户goroutine可并发运行,但需要一定时间的STW

三色标记法只是为了叙述方便而抽象出来的一种说法,实际上的对象是没有三色之分的。这里的三色,对应了垃圾回收过程中对象的三种状态:

  • 灰色:对象还在标记队列中等待
  • 黑色:对象已被标记,gcmarkBits 对应位为 1 (该对象不会在本次 GC 中被回收)
  • 白色:对象未被标记,gcmarkBits 对应位为 0 (该对象将会在本次 GC 中被清理)

step 1: 创建:白、灰、黑 三个集合

step 2: 将所有对象放入白色集合中

step 3: 遍历所有root对象,把遍历到的对象从白色集合放入灰色集合 (这里放入灰色集合的都是根节点的对象)

step 4: 遍历灰色集合,将灰色对象引用的对象从白色集合放入灰色集合,自身标记为黑色

step 5: 重复步骤4,直到灰色中无任何对象,其中用到2个机制:

  • 写屏障(Write Barrier):上面说到的 STW 的目的是防止 GC 扫描时内存变化引起的混乱,而写屏障就是让 goroutine 与 GC 同时运行的手段,虽然不能完全消除 STW,但是可以大大减少 STW 的时间。写屏障在 GC 的特定时间开启,开启后指针传递时会把指针标记,即本轮不回收,下次 GC 时再确定。
  • 辅助 GC(Mutator Assist):为了防止内存分配过快,在 GC 执行过程中,GC 过程中 mutator 线程会并发运行,而 mutator assist 机制会协助 GC 做一部分的工作。

step 6: 收集所有白色对象(垃圾)

root对象

根对象在垃圾回收的术语中又叫做根集合,它是垃圾回收器在标记过程时最先检查的对象,包括:

全局变量:程序在编译期就能确定的那些存在于程序整个生命周期的变量。 执行栈:每个 goroutine 都包含自己的执行栈,这些执行栈上指向堆内存的指针。 寄存器:寄存器的值可能表示一个指针,参与计算的这些指针可能指向某些赋值器分配的堆内存区块。

插入写屏障

对象被引用时触发的机制(只在堆内存中生效):赋值器这一行为通知给并发执行的回收器,被引用的对象标记为灰色

缺点:结束时需要STW来重新扫描栈,标记栈上引用的白色对象的存活

删除写屏障

对象被删除时触发的机制(只在堆内存中生效):赋值器将这一行为通知给并发执行的回收器,被删除的对象,如果自身为灰色或者白色,那么标记为灰色

缺点:一个对象的引用被删除后,即使没有其他存活的对象引用它,它仍然会活到下一轮,会产生很大冗余扫描成本,且降低了回收精度

混合写屏障

GC没有混合写屏障前,一直是插入写屏障;混合写屏障是插入写屏障 + 删除写屏障,写屏障只应用在堆上应用,栈上不启用(栈上启用成本很高)

  • GC开始将栈上的对象全部扫描并标记为黑色。
  • GC期间,任何在栈上创建的新对象,均为黑色。
  • 被删除的对象标记为灰色。
  • 被添加的对象标记为灰色。

GC流程

一次完整的垃圾回收会分为四个阶段,分别是标记准备、标记开始、标记终止、清理:

  1. 标记准备(Mark Setup):打开写屏障(Write Barrier),需 STW(stop the world)
  2. 标记开始(Marking):使用三色标记法并发标记 ,与用户程序并发执行
  3. 标记终止(Mark Termination):对触发写屏障的对象进行重新扫描标记,关闭写屏障(Write Barrier),需 STW(stop the world)
  4. 清理(Sweeping):将需要回收的内存归还到堆中,将过多的内存归还给操作系统,与用户程序并发执行

img

GC触发时机

主动触发:

  • 调用 runtime.GC() 方法,触发 GC

被动触发:

  • 定时触发,该触发条件由 runtime.forcegcperiod 变量控制,默认为 2 分 钟。当超过两分钟没有产生任何 GC 时,触发 GC
  • 根据内存分配阈值触发,该触发条件由环境变量GOGC控制,默认值为100(100%),当前堆内存占用是上次GC结束后占用内存的2倍时,触发GC

GC算法演进

  • Go 1:mark and sweep操作都需要STW
  • Go 1.3:分离了mark和sweep操作,mark过程需要 STW,mark完成后让sweep任务和普通协程任务一样并行,停顿时间在约几百ms
  • Go 1.5:引入三色并发标记法、插入写屏障,不需要每次都扫描整个内存空间,可以减少stop the world的时间,停顿时间在100ms以内
  • Go 1.6:使用 bitmap 来记录回收内存的位置,大幅优化垃圾回收器自身消耗的内存,停顿时间在10ms以内
  • Go 1.7:停顿时间控制在2ms以内
  • Go 1.8:混合写屏障(插入写屏障和删除写屏障),停顿时间在0.5ms左右
  • Go 1.9:彻底移除了栈的重扫描过程
  • Go 1.12:整合了两个阶段的 Mark Termination
  • Go 1.13:着手解决向操作系统归还内存的,提出了新的 Scavenger
  • Go 1.14:替代了仅存活了一个版本的 scavenger,全新的页分配器,优化分配内存过程的速率与现有的扩展性问题,并引入了异步抢占,解决了由于密集循环导致的 STW 时间过长的问题

Go GC如何调优?

  • 控制内存分配的速度,限制 Goroutine 的数量,提高赋值器 mutator 的 CPU 利用率(降低GC的CPU利用率)
  • 少量使用+连接string
  • slice提前分配足够的内存来降低扩容带来的拷贝
  • 避免map key对象过多,导致扫描时间增加
  • 变量复用,减少对象分配,例如使用 sync.Pool 来复用需要频繁创建临时对象、使用全局变量等
  • 增大 GOGC 的值,降低 GC 的运行频率

Go 如何查看GC信息?

1. GODEBUG=’gctrace=1’

1
2
3
4
5
6
package main
func main() {
for n := 1; n < 100000; n++ {
_ = make([]byte, 1<<20)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ GODEBUG='gctrace=1' go run main.go

gc 1 @0.003s 4%: 0.013+1.7+0.008 ms clock, 0.10+0.67/1.2/0.018+0.064 ms cpu, 4->6->2 MB, 5 MB goal, 8 P
gc 2 @0.006s 2%: 0.006+4.5+0.058 ms clock, 0.048+0.070/0.027/3.6+0.47 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 3 @0.011s 3%: 0.021+1.3+0.009 ms clock, 0.17+0.041/0.41/0.046+0.072 ms cpu, 4->6->2 MB, 5 MB goal, 8 P
gc 4 @0.013s 5%: 0.025+0.38+0.26 ms clock, 0.20+0.054/0.15/0.009+2.1 ms cpu, 4->6->2 MB, 5 MB goal, 8 P
gc 5 @0.014s 5%: 0.021+0.16+0.002 ms clock, 0.17+0.098/0.028/0.001+0.016 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 6 @0.014s 7%: 0.025+1.6+0.003 ms clock, 0.20+0.061/2.9/1.5+0.025 ms cpu, 4->6->2 MB, 5 MB goal, 8 P
gc 7 @0.016s 7%: 0.019+1.0+0.002 ms clock, 0.15+0.053/1.0/0.018+0.017 ms cpu, 4->6->2 MB, 5 MB goal, 8 P
gc 8 @0.017s 7%: 0.029+0.17+0.002 ms clock, 0.23+0.037/0.10/0.063+0.022 ms cpu, 4->4->0 MB, 5 MB goal, 8 P
gc 9 @0.018s 7%: 0.019+0.23+0.002 ms clock, 0.15+0.040/0.16/0.023+0.018 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 10 @0.018s 7%: 0.022+0.23+0.003 ms clock, 0.17+0.061/0.13/0.006+0.024 ms cpu, 4->6->2 MB, 5 MB goal, 8 P
gc 11 @0.018s 7%: 0.019+0.11+0.001 ms clock, 0.15+0.033/0.051/0.013+0.015 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 12 @0.019s 7%: 0.018+0.19+0.001 ms clock, 0.14+0.035/0.10/0.018+0.014 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 13 @0.019s 7%: 0.018+0.35+0.002 ms clock, 0.15+0.21/0.054/0.013+0.016 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 14 @0.019s 8%: 0.024+0.27+0.002 ms clock, 0.19+0.022/0.13/0.014+0.017 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 15 @0.020s 8%: 0.019+0.42+0.038 ms clock, 0.15+0.060/0.28/0.007+0.31 ms cpu, 4->17->13 MB, 5 MB goal, 8 P
gc 16 @0.021s 8%: 0.018+0.53+0.060 ms clock, 0.14+0.045/0.39/0.005+0.48 ms cpu, 21->28->7 MB, 26 MB goal, 8 P
gc 17 @0.021s 10%: 0.020+0.91+0.64 ms clock, 0.16+0.050/0.36/0.027+5.1 ms cpu, 12->16->4 MB, 14 MB goal, 8 P
gc 18 @0.023s 10%: 0.020+0.55+0.002 ms clock, 0.16+0.053/0.50/0.081+0.023 ms cpu, 7->9->2 MB, 8 MB goal, 8 P

字段含义由下表所示:

字段 含义
gc 2 第二个 GC 周期
0.006 程序开始后的 0.006 秒
2% 该 GC 周期中 CPU 的使用率
0.006 标记开始时, STW 所花费的时间(wall clock)
4.5 标记过程中,并发标记所花费的时间(wall clock)
0.058 标记终止时, STW 所花费的时间(wall clock)
0.048 标记开始时, STW 所花费的时间(cpu time)
0.070 标记过程中,标记辅助所花费的时间(cpu time)
0.027 标记过程中,并发标记所花费的时间(cpu time)
3.6 标记过程中,GC 空闲的时间(cpu time)
0.47 标记终止时, STW 所花费的时间(cpu time)
4 标记开始时,堆的大小的实际值
5 标记结束时,堆的大小的实际值
1 标记结束时,标记为存活的对象大小
5 标记结束时,堆的大小的预测值
8 P 的数量

2. go tool trace

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"os"
"runtime/trace"
)

func main() {
f, _ := os.Create("trace.out")
defer f.Close()
trace.Start(f)
defer trace.Stop()
for n := 1; n < 100000; n++ {
_ = make([]byte, 1<<20)
}
}
1
2
$ go run main.go
$ go tool trace trace.out

打开浏览器后,可以看到如下统计:

img

点击View trace,可以查看当时的trace情况

img

点击 Minimum mutator utilization,可以查看到赋值器 mutator (用户程序)对 CPU 的利用率 74.1%,接近100%则代表没有针对GC的优化空间了

img

3. debug.ReadGCStats

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"runtime/debug"
"time"
)

func printGCStats() {
t := time.NewTicker(time.Second)
s := debug.GCStats{}
for {
select {
case <-t.C:
debug.ReadGCStats(&s)
fmt.Printf("gc %d last@%v, PauseTotal %v\n", s.NumGC, s.LastGC, s.PauseTotal)
}
}
}
func main() {
go printGCStats()
for n := 1; n < 100000; n++ {
_ = make([]byte, 1<<20)
}
}
1
2
3
4
5
6
7
8
$ go run main.go

gc 3392 last@2022-05-04 19:22:52.877293 +0800 CST, PauseTotal 117.524907ms
gc 6591 last@2022-05-04 19:22:53.876837 +0800 CST, PauseTotal 253.254996ms
gc 10028 last@2022-05-04 19:22:54.87674 +0800 CST, PauseTotal 376.981595ms
gc 13447 last@2022-05-04 19:22:55.87689 +0800 CST, PauseTotal 511.420111ms
gc 16938 last@2022-05-04 19:22:56.876955 +0800 CST, PauseTotal 649.293449ms
gc 20350 last@2022-05-04 19:22:57.876756 +0800 CST, PauseTotal 788.003014ms

字段含义由下表所示:

字段 含义
NumGC GC总次数
LastGC 上次GC时间
PauseTotal STW总耗时

4. runtime.ReadMemStats

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"runtime"
"time"
)

func printMemStats() {
t := time.NewTicker(time.Second)
s := runtime.MemStats{}
for {
select {
case <-t.C:
runtime.ReadMemStats(&s)
fmt.Printf("gc %d last@%v, heap_object_num: %v, heap_alloc: %vMB, next_heap_size: %vMB\n",
s.NumGC, time.Unix(int64(time.Duration(s.LastGC).Seconds()), 0), s.HeapObjects, s.HeapAlloc/(1<<20), s.NextGC/(1<<20))
}
}
}
func main() {
go printMemStats()
fmt.Println(1 << 20)
for n := 1; n < 100000; n++ {
_ = make([]byte, 1<<20)
}
}

1
2
3
4
5
6
7
8
$ go run main.go

gc 2978 last@2022-05-04 19:38:04 +0800 CST, heap_object_num: 391, heap_alloc: 20MB, next_heap_size: 28MB
gc 5817 last@2022-05-04 19:38:05 +0800 CST, heap_object_num: 370, heap_alloc: 4MB, next_heap_size: 4MB
gc 9415 last@2022-05-04 19:38:06 +0800 CST, heap_object_num: 392, heap_alloc: 7MB, next_heap_size: 8MB
gc 11429 last@2022-05-04 19:38:07 +0800 CST, heap_object_num: 339, heap_alloc: 4MB, next_heap_size: 5MB
gc 14706 last@2022-05-04 19:38:08 +0800 CST, heap_object_num: 436, heap_alloc: 6MB, next_heap_size: 8MB
gc 18253 last@2022-05-04 19:38:09 +0800 CST, heap_object_num: 375, heap_alloc: 4MB, next_heap_size: 6M

字段含义由下表所示:

字段 含义
NumGC GC总次数
LastGC 上次GC时间
HeapObjects 堆中已经分配的对象总数,GC内存回收后HeapObjects取值相应减小
HeapAlloc 堆中已经分配给对象的字节数,GC内存回收后HeapAlloc取值相应减小
NextGC 下次GC目标堆的大小

并发编程

Go 常用的并发模型?

并发模型说的是系统中的线程如何协作完成并发任务,不同的并发模型,线程以不同的方式进行通信和协作。

线程间通信方式

线程间通信方式有两种:共享内存和消息传递,无论是哪种通信模型,线程或者协程最终都会从内存中获取数据,所以更为准确的说法是直接共享内存、发送消息的方式来同步信息

共享内存

抽象层级:抽象层级低,当我们遇到对资源进行更细粒度的控制或者对性能有极高要求的场景才应该考虑抽象层级更低的方法

耦合:高,线程需要在读取或者写入数据时先获取保护该资源的互斥锁

线程竞争:需要加锁,才能避免线程竞争和数据冲突

发送消息

抽象层级:抽象层级高,提供了更良好的封装和与领域更相关和契合的设计,比如Go 语言中的Channel就提供了 Goroutine 之间用于传递信息的方式,它在内部实现时就广泛用到了共享内存和锁,通过对两者进行的组合提供了更高级的同步机制

耦合:低,生产消费者模型

线程竞争:保证同一时间只有一个活跃的线程能够访问数据,channel维护所有被该chanel阻塞的协程,保证有资源的时候只唤醒一个协程,从而避免竞争

Go语言中实现了两种并发模型,一种是共享内存并发模型,另一种则是CSP模型。

共享内存并发模型

通过直接共享内存 + 锁的方式同步信息,传统多线程并发

img

CSP并发模型

通过发送消息的方式来同步信息,Go语言推荐使用的通信顺序进程(communicating sequential processes)并发模型,通过goroutine和channel来实现

  • goroutine 是Go语言中并发的执行单位,可以理解为”线程“
  • channel是Go语言中各个并发结构体(goroutine)之前的通信机制。 通俗的讲,就是各个goroutine之间通信的”管道“,类似于Linux中的管道

img

Go 有哪些并发同步原语?

Go是一门以并发编程见长的语言,它提供了一系列的同步原语方便开发者使用

原子操作

Mutex、RWMutex 等并发原语的底层实现是通过 atomic 包中的一些原子操作来实现的,原子操作是最基础的并发原语

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync/atomic"
)

var opts int64 = 0

func main() {
add(&opts, 3)
load(&opts)
compareAndSwap(&opts, 3, 4)
swap(&opts, 5)
store(&opts, 6)
}

func add(addr *int64, delta int64) {
atomic.AddInt64(addr, delta) //加操作
fmt.Println("add opts: ", *addr)
}

func load(addr *int64) {
fmt.Println("load opts: ", atomic.LoadInt64(&opts))
}

func compareAndSwap(addr *int64, oldValue int64, newValue int64) {
if atomic.CompareAndSwapInt64(addr, oldValue, newValue) {
fmt.Println("cas opts: ", *addr)
return
}
}

func swap(addr *int64, newValue int64) {
atomic.SwapInt64(addr, newValue)
fmt.Println("swap opts: ", *addr)
}

func store(addr *int64, newValue int64) {
atomic.StoreInt64(addr, newValue)
fmt.Println("store opts: ", *addr)
}

Channel

channel 管道,高级同步原语,goroutine之间通信的桥梁

使用场景:消息队列、数据传递、信号通知、任务编排、锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"time"
)

func main() {
c := make(chan struct{}, 1)
for i := 0; i < 10; i++ {
go func() {
c <- struct{}{}
time.Sleep(1 * time.Second)
fmt.Println("通过ch访问临界区")
<-c
}()
}
for {
}
}

基本并发原语

Go 语言在 sync包中提供了用于同步的一些基本原语,这些基本原语提供了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级更高的 Channel 实现同步。

常见的并发原语如下:sync.Mutexsync.RWMutexsync.WaitGroupsync.Condsync.Oncesync.Poolsync.Context

sync.Mutex

sync.Mutex (互斥锁) 可以限制对临界资源的访问,保证只有一个 goroutine 访问共享资源

使用场景:大量读写,比如多个 goroutine 并发更新同一个资源,像计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"sync"
)

func main() {
// 封装好的计数器
var counter Counter
var wg sync.WaitGroup
var gNum = 1000
wg.Add(gNum)
// 启动10个goroutine
for i := 0; i < gNum; i++ {
go func() {
defer wg.Done()
counter.Incr() // 受到锁保护的方法
}()
}
wg.Wait()
fmt.Println(counter.Count())
}

// 线程安全的计数器类型
type Counter struct {
mu sync.Mutex
count uint64
}

// 加1的方法,内部使用互斥锁保护
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

// 得到计数器的值,也需要锁保护
func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
sync.RWMutex

sync.RWMutex (读写锁) 可以限制对临界资源的访问,保证只有一个 goroutine 写共享资源,可以有多个goroutine 读共享资源

使用场景:大量并发读,少量并发写,有强烈的性能要求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
"sync"
"time"
)

func main() {
// 封装好的计数器
var counter Counter
var gNum = 1000
// 启动10个goroutine
for i := 0; i < gNum; i++ {
go func() {
counter.Count() // 受到锁保护的方法
}()
}
for { // 一个writer
counter.Incr() // 计数器写操作
fmt.Println("incr")
time.Sleep(time.Second)
}
}

// 线程安全的计数器类型
type Counter struct {
mu sync.RWMutex
count uint64
}

// 加1的方法,内部使用互斥锁保护
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

// 得到计数器的值,也需要锁保护
func (c *Counter) Count() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}
sync.WaitGroup

sync.WaitGroup 可以等待一组 Goroutine 的返回

使用场景:并发等待,任务编排,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求

1
2
3
4
5
6
7
8
9
10
11
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
sync.Cond

sync.Cond 可以让一组的 Goroutine 都在满足特定条件时被唤醒

使用场景:利用等待 / 通知机制实现阻塞或者唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

var status int64

func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
time.Sleep(1 * time.Second)
}

func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Signal()
c.L.Unlock()
}

func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
sync.Once

sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次

使用场景:常常用于单例对象的初始化场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
"sync"
)

func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
sync.Pool

sync.Pool可以将暂时将不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能(频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺)

使用场景:对象池化, TCP连接池、数据库连接池、Worker Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"sync"
)

func main() {
pool := sync.Pool{
New: func() interface{} {
return 0
},
}

for i := 0; i < 10; i++ {
v := pool.Get().(int)
fmt.Println(v) // 取出来的值是put进去的,对象复用;如果是新建对象,则取出来的值为0
pool.Put(i)
}
}
sync.Map

sync.Map 线程安全的map

使用场景:map 并发读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"sync"
)

func main() {
var scene sync.Map
// 将键值对保存到sync.Map
scene.Store("1", 1)
scene.Store("2", 2)
scene.Store("3", 3)
// 从sync.Map中根据键取值
fmt.Println(scene.Load("1"))
// 根据键删除对应的键值对
scene.Delete("1")
// 遍历所有sync.Map中的键值对
scene.Range(func(k, v interface{}) bool {
fmt.Println("iterate:", k, v)
return true
})
}
sync.Context

sync.Context 可以进行上下文信息传递、提供超时和取消机制、控制子 goroutine 的执行

使用场景:取消一个goroutine的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"context"
"fmt"
"time"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer func() {
fmt.Println("goroutine exit")
}()
for {
select {
case <-ctx.Done():
fmt.Println("receive cancel signal!")
return
default:
fmt.Println("default")
time.Sleep(time.Second)
}
}
}()
time.Sleep(time.Second)
cancel()
time.Sleep(2 * time.Second)
}

扩展并发原语

ErrGroup

errgroup 可以在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能

使用场景:只要一个 goroutine 出错我们就不再等其他 goroutine 了,减少资源浪费,并且返回错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"net/http"

"golang.org/x/sync/errgroup"
)

func main() {
var g errgroup.Group
var urls = []string{
"http://www.baidu.com/",
"https://www.sina.com.cn/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
err := g.Wait()
if err == nil {
fmt.Println("Successfully fetched all URLs.")
} else {
fmt.Println("fetched error:", err.Error())
}
}
Semaphore

Semaphore带权重的信号量,控制多个goroutine同时访问资源

使用场景:控制 goroutine 的阻塞和唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"context"
"fmt"
"log"
"runtime"
"time"

"golang.org/x/sync/semaphore"
)

var (
maxWorkers = runtime.GOMAXPROCS(0)
sema = semaphore.NewWeighted(int64(maxWorkers)) //信号量
task = make([]int, maxWorkers*4)

// 任务数,是worker的四
)

func main() {
ctx := context.Background()
for i := range task {
// 如果没有worker可用,会阻塞在这里,直到某个worker被释放
if err := sema.Acquire(ctx, 1); err != nil {
break
}
// 启动worker goroutine
go func(i int) {
defer sema.Release(1)
time.Sleep(100 * time.Millisecond) // 模拟一个耗时操作
task[i] = i + 1
}(i)
}
// 请求所有的worker,这样能确保前面的worker都执行完
if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("获取所有的worker失败: %v", err)
}
fmt.Println(maxWorkers, task)
}
SingleFlight

用于抑制对下游的重复请求

使用场景:访问缓存、数据库等场景,缓存过期时只有一个请求去更新数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/singleflight"
)

// 模拟从数据库读取
func getArticle(id int) (article string, err error) {
// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(count) * time.Millisecond)

return fmt.Sprintf("article: %d", id), nil
}

// 模拟优先读缓存,缓存不存在读取数据库,并且只有一个请求读取数据库,其它请求等待
func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
return getArticle(id)
})

return v.(string), err
}

var count int32

func main() {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})

var (
wg sync.WaitGroup
now = time.Now()
n = 1000
sg = &singleflight.Group{}
)

for i := 0; i < n; i++ {
wg.Add(1)
go func() {
res, _ := singleflightGetArticle(sg, 1)
// res, _ := getArticle(1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}()
}

wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}

Go WaitGroup实现原理?

概念

Go标准库提供了WaitGroup原语, 可以用它来等待一批 Goroutine 结束

底层数据结构

1
2
3
4
5
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}

其中 noCopy 是 golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用 go vet 检查程序时,就会发现有报错。但需要注意的是,noCopy 不会影响程序正常的编译和运行。

state1主要是存储着状态和信号量,状态维护了 2 个计数器,一个是请求计数器counter ,另外一个是等待计数器waiter(已调用 WaitGroup.Wait 的 goroutine 的个数)

当数组的首地址是处于一个8字节对齐的位置上时,那么就将这个数组的前8个字节作为64位值使用表示状态,后4个字节作为32位值表示信号量(semaphore);同理如果首地址没有处于8字节对齐的位置上时,那么就将前4个字节作为semaphore,后8个字节作为64位数值。

img

使用方法

在WaitGroup里主要有3个方法:

  • WaitGroup.Add():可以添加或减少请求的goroutine数量,*Add(n) 将会导致 counter += n*
  • WaitGroup.Done():相当于Add(-1),Done() 将导致 counter -=1,请求计数器counter为0 时通过信号量调用runtime_Semrelease唤醒waiter线程
  • WaitGroup.Wait():会将 waiter++,同时通过信号量调用 runtime_Semacquire(semap)阻塞当前 goroutine
1
2
3
4
5
6
7
8
9
10
11
12
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
println("hello")
}()
}

wg.Wait()
}

Go Cond实现原理?

概念

Go标准库提供了Cond原语,可以让 Goroutine 在满足特定条件时被阻塞和唤醒

底层数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker

notify notifyList
checker copyChecker
}

type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}

主要有4个字段:

  • nocopy : golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用 go vet 检查程序时,就会发现有报错,但需要注意的是,noCopy 不会影响程序正常的编译和运行
  • checker:用于禁止运行期间发生拷贝,双重检查(Double check)
  • L:可以传入一个读写锁或互斥锁,当修改条件或者调用Wait方法时需要加锁
  • notify:通知链表,调用Wait()方法的Goroutine会放到这个链表中,从这里获取需被唤醒的Goroutine列表

使用方法

在Cond里主要有3个方法:

  • sync.NewCond(l Locker): 新建一个 sync.Cond 变量,注意该函数需要一个 Locker 作为必填参数,这是因为在 cond.Wait() 中底层会涉及到 Locker 的锁操作
  • Cond.Wait(): 阻塞等待被唤醒,调用Wait函数前需要先加锁;并且由于Wait函数被唤醒时存在虚假唤醒等情况,导致唤醒后发现,条件依旧不成立,因此需要使用 for 语句来循环地进行等待,直到条件成立为止
  • Cond.Signal(): 只唤醒一个最先 Wait 的 goroutine,可以不用加锁
  • Cond.Broadcast(): 唤醒所有Wait的goroutine,可以不用加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

var status int64

func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
go broadcast(c)
time.Sleep(1 * time.Second)
}

func broadcast(c *sync.Cond) {
// 原子操作
atomic.StoreInt64(&status, 1)
c.Broadcast()
}

func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
// Wait 内部会先调用 c.L.Unlock(),来先释放锁,如果调用方不先加锁的话,会报错
}
fmt.Println("listen")
c.L.Unlock()
}

Go 有哪些方式安全读写共享变量?

方法 并发原语 备注
不要修改变量 sync.Once 不要去写变量,变量只初始化一次
只允许一个goroutine访问变量 Channel 不要通过共享变量来通信,通过通信(channel)来共享变量
允许多个goroutine访问变量,但是同一时间只允许一个goroutine访问 sync.Mutex、sync.RWMutex、原子操作 实现锁机制,同时只有一个线程能拿到

Go 如何排查数据竞争问题?

概念

只要有两个以上的goroutine并发访问同一变量,且至少其中的一个是写操作的时候就会发生数据竞争;全是读的情况下是不存在数据竞争的。

排查方式

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func main() {
i := 0

go func() {
i++ // write i
}()

fmt.Println(i) // read i
}

go命令行有个参数race可以帮助检测代码中的数据竞争

1
2
3
4
5
$ go run -race main.go

WARNING: DATA RACE
Write at 0x00c0000ba008 by goroutine 7:
exit status 66
  • 标题: Go语言面试题精讲
  • 作者: Olivia的小跟班
  • 创建于 : 2023-08-28 18:57:28
  • 更新于 : 2023-09-22 01:07:14
  • 链接: https://www.youandgentleness.cn/2023/08/28/Go语言面试题精讲/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论
此页目录
Go语言面试题精讲