Go In Action 读书笔记 三

并发

Go语言里的并发是指让某个函数可以独立于其他函数运行的能力. 当一个函数创建为goroutine时, Go会将其视为一个独立的工作单元. 这个工作单元会被调度到可用的逻辑处理器上执行.

Go的运行时调度器可以管理所有创建的goroutine, 并为其分配执行时间. 这个调度器在操作系统之上, 将操作系统的线程与逻辑处理器绑定, 并在逻辑处理器执行goroutine. 调度器可以在任何给定的时间, 全面控制哪个goroutine在哪个逻辑处理器上运行.

Go的并发同步模型来自一个叫做通信顺序进程(Communicating Sequential Processes, CSP). CSP是一个消息传递模型, 通过在goroutine之前传递数据来传递消息, 不需要通过加锁实现同步访问. 用于在goroutine间传递消息的数据结构叫做通道(channel).

并发与并行

操作系统的线程(thread)和进程(process).

进程类似应用程序在运行中需要用到和维护的各种资源的容器. 资源包括但不限于: 内存(来自文件系统的代码和数据), 句柄(文件, 设备, 操作系统), 线程.

每个进程至少有一个线程, 一个线程是一个执行空间. 这个空间会被操作系统调度来运行函数中所写的代码. 每个线程的初始线程被称为主线程. 主线程终止时, 应用程序也会终止.操作系统将线程调度到某个处理器上运行, 这个处理器不一定是进程所在的处理器.

Go语言的运行时会在逻辑处理器上调度goroutine运行. 每个逻辑处理器都分别绑定到单个操作系统线程. Go语言的运行时默认会为每个可用的物理处理器分配一个逻辑处理器.

创建一个gorouine并准备运行, 这个goroutine就会被放到调度器的全局运行队列中. 之后, 调度器就将这些队列中的goroutine分配给一个逻辑处理器, 并放到该逻辑处理器对应的本地运行队列, 然后在队列中等待被逻辑处理器执行.

如果goroutine执行了阻塞线程的调用, 调度器会将这个操作系统线程与逻辑处理器分离, 并创建一个新的线程与逻辑处理器绑定, 然后. 一旦阻塞的调用完成, 该goroutine会回到本地运行队列.

如果阻塞调用是网络I/O, goroutine会与逻辑处理器分离, 移到集成了网络轮询器的运行时. 一旦轮询器指示某个网络的读或写操作已经就绪, 对应的goroutine就会重新分配到逻辑处理器上完成操作.

调度器对可以创建的逻辑处理器的数量没有限制, 但是语言运行时默认限制每个程序最多创建10000个线程. 可以通过调用runtime/debug包的SetMaxThreads方法来更改.

并发(concurrency)不是并行(parallelism)

并行是让不同的代码同时在不同的物理处理器上执行. 并行的关键是同时做很多事. 并发是指同时管理很多事情, 这些事情可能只做一般就再暂停去做别的事情了.

使用较少的资源做更多的事情

多个逻辑处理器时, goroutine会被平均分配到每个逻辑处理器上, 让goroutine在不同的线程上运行.

goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import (
	"fmt"
	"sync"
	"runtime"
)

func main() {
	//分配一个逻辑处理器给调度器使用
	runtime.GOMAXPROCS(1)

	var wg sync.WaitGroup
	wg.Add(2)

	fmt.Printf("%s\n", "Start")

	go func(){
		defer wg.Done()
		for i :=0 ; i < 3; i++ {
			for ch := 'a'; ch < 'a' + 26; ch ++ {
				fmt.Printf("%c ", ch)
			}
		}
	}()

	go func(){
		defer wg.Done()
		for i :=0 ; i < 3; i++ {
			for ch := 'A'; ch < 'A' + 26; ch ++ {
				fmt.Printf("%c ", ch)
			}
		}
	}()

	fmt.Println("Wait")
	wg.Wait()
	fmt.Println("\nEnd")
}

//结果
//Start
//Wait
//A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z 
//End

//第一个goroutine完成所有显示需要的时间太短, 以至于在调度器切换到第二个goroutine之前就完成了所有任务. 

程序可以使用runtime.GOMAXPROCS来更改调度器可以下使用的逻辑处理器的数量. 如果不想代码里使用, 可以使用跟函数同名的环境变量(GOMAXPROCS)来设置. 使用runtime.NumCPU()可以获取物理处理器的个数.

WaitGroup是一个计数信号量, 可以用来记录并维护运行的goroutine. 使用defer在goroutine函数调用完成后调用Done方法.

一个正在运行的goroutine在工作结束前, 可以被停止(回到本地队列)并重新调度. 防止某个goroutine长时间占用逻辑处理器.

竞争状态

race condition: 多个goroutine在没有互相同步的情况系啊, 访问某个共享的资源, 并试图同时读和写这个资源, 存在竞争的状态.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import (
	"fmt"
	"sync"
	"runtime"
)


var (
	counter int
	wg sync.WaitGroup
)

func main() {
	wg.Add(2)

	go incCounter(1)
	go incCounter(2)

	wg.Wait()
	fmt.Println("Final counter: ", counter)

}

func incCounter(id int) {
	defer wg.Done()

	for i:= 0; i < 2; i++ {
		val := counter
		//当前goroutine从线程退出, 并回到队列
		runtime.Gosched()

		val++

		counter = val
	}
}
//结果
//Final counter:  2

非原子操作导致最后结果为2

锁住共享资源

使用atomicsync包的函数

原子函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import (
	"fmt"
	"sync"
	"sync/atomic"
	"runtime"
)

var (
	counter int64
	wg sync.WaitGroup
)

func main() {
	wg.Add(2)

	go incCounter(1)
	go incCounter(2)

	wg.Wait()
	fmt.Println("Final counter: ", counter)

}

func incCounter(id int) {
	defer wg.Done()

	for i:= 0; i < 2; i++ {
	   //安全地对counter加1
		atomic.AddInt64(&counter, 1)
		//当前goroutine从线程退出, 并回到队列
		runtime.Gosched()
	}
}

atomic包的AddInt64函数, 会同步整型值的加法, 方法是强制同一时刻只能有一个goroutine运行并完成这个加法操作. 还有LoadInt64StoreInt64函数, 提供安全的读写整型值的方式.

互斥锁

使用互斥锁mutex, 名字来自互斥mutual exclusion的概念. 在代码上创建一个链接去, 保证同一时间只有一个goroutine可以执行这个临界区的代码.

临界区的代码可以使用大括号{}包围, 提升可读性.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import (
	"fmt"
	"sync"
	"runtime"
)

var (
	counter int64
	wg sync.WaitGroup
	mutex sync.Mutex
)

func main() {
	wg.Add(2)

	go incCounter(1)
	go incCounter(2)

	wg.Wait()
	fmt.Println("Final counter: ", counter)

}

func incCounter(id int) {
	defer wg.Done()

	for i:= 0; i < 2; i++ {
      //创建临界区
		mutex.Lock()
		{
			val := counter
			//当前goroutine从线程退出, 并回到队列
			runtime.Gosched()
			val++
			counter = val
		}
		mutex.Unlock()
	}
}

通道

当一个资源需要在goroutine之间共享时, 通道在goroutine之前架起了一个管道, 并提供了确保同步交换数据的机制.

声明通道时需要指定要共享的数据类型, 包括共享内置类型, 命名类型, 结构类型和引用类型的值或者指针.

需要使用关键字make创建通道. make的第一个参数需要关键字chan, 之后跟着交换的数据的类型. 如果是创建的有缓冲的通道, 第二个参数要指定通道的缓冲区的大小.

1
2
3
4
//无缓冲的整形通道
unbuffered := make(chan int)
//有缓冲的字符串通道
buffered := make(chan string, 10)

通道操作

1
2
3
4
//写字符串到通道
buffered <- "Gopher"
//从通道接收一个字符串
value := <- buffered
无缓冲通道

unbuffered channel是指在接收前没有能力保存任何值的通道. 这种通道要求发送goroutine和接收goroutine同时准备好, 才能完成发送和接收操作. 如果没有同时准备好, 会导致先执行发送或接收操作的goroutine阻塞等待.

无缓冲t通道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import (
	"fmt"
	"sync"
	"time"
	"math/rand"
)

var wg sync.WaitGroup

func init(){
	rand.Seed(time.Now().UnixNano())
}

func main() {
	wg.Add(2)

	court := make(chan int)

	go player("Lisa", court)
	go player("Bill", court)

	court <- 1

	wg.Wait()
}
func player(name string, court chan int) {
	defer wg.Done()
	for {
		ball, ok := <- court
		if !ok {
			fmt.Printf("Player %s Won\n", name)
			return
		}

		n := rand.Intn(100)

		if n%13 == 0 {
			fmt.Printf("Player %s missed\n", name)
			close(court)
			return
		}

		fmt.Printf("Player %s Hit %d\n", name, ball)

		ball++

		court <- ball
	}
}
有缓冲通道

buffered channel是一种在被接收前能存储一个或者多个值的通道. 并不要求goroutine之间必须同时完成发送和接收.

只有在缓冲区里没有数据的时候接收才会阻塞; 同样只有缓冲区满的时候发送才会阻塞.

有缓冲t通道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import (
	"sync"
	"math/rand"
	"time"
	"fmt"
)

const (
	workers  = 4
	taskLoad = 10
)

var wg sync.WaitGroup

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
	tasks := make(chan string, taskLoad)

	wg.Add(workers)

	for i := 0; i < workers; i++ {
		go worker(tasks, i)
	}

	for i := 0; i < taskLoad; i++ {
		tasks <- fmt.Sprintf("Task : %d", i)
	}

	close(tasks)

	wg.Wait()
}
func worker(tasks chan string, id int) {
	defer wg.Done()

	for {
		task, ok := <-tasks

		if !ok {
			fmt.Printf("Work %d shutting down\n", id)
			return
		}

		fmt.Printf("Worker: %d : Started %s\n", id, task)

		sleep := rand.Int63n(100)

		time.Sleep(time.Duration(sleep) * time.Millisecond)

		fmt.Printf("Worker : %d : Completed %s\n", id, task)
	}
}

Comments

comments powered by Disqus