Go In Action 读书笔记 四

并发模式

runner

runner展示了如何使用通道来监视程序的执行时间, 如果程序执行时间太长, 也可以用终止程序. 这个程序可用作corn作业执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package runner

import (
	"os"
	"time"
	"errors"
	"os/signal"
	"log"
)

type Runner struct {
	//系统信号通道
	interrupt chan os.Signal
	//任务执行结果通道
	complete  chan error
	//报告任务处理已经超时
	timeout   <-chan time.Time
	tasks     []func(int)
}
//超时错误
var ErrTimeout = errors.New("received timeout")
//系统终端错误
var ErrInterrupt = errors.New("received interrupt")

//返回一个新的准备使用的Runner
func New(d time.Duration) *Runner {
	return &Runner{
		interrupt: make(chan os.Signal, 1),
		complete:  make(chan error),
		timeout:   time.After(d), //After函数会使用goroutine启动一个timer, timer时间到后向channel写入Time
	}
}

//向Runner中添加task
func (r *Runner) AddTask(tasks ... func(int)) {
	r.tasks = append(r.tasks, tasks...)
}

func (r *Runner) Start() error {
	//希望接收所有终端信号
	signal.Notify(r.interrupt, os.Interrupt)

	go func() {
		//使用goroutine执行任务
		r.complete <- r.run()
	}()

	select { //main线程在select处阻塞, 要么等待任务执行结果结束, 要么等待计时器报告超时
	case err := <-r.complete://阻塞等待任务执行结果
		return err
	case <-r.timeout: //阻塞等待超时报告
		return ErrTimeout
	}
}

func (r *Runner) run() error {
	for id, task := range r.tasks {
		//检测是否有来自操作系统的终端信号
		if r.getInterrupted() {
			return ErrInterrupt
		}
		//执行任务
		task(id)
	}
	return nil
}

func (r *Runner) getInterrupted() bool {
	//使用default将select的阻塞变成非阻塞. 每次方法调用只是检查通道中是否有数据, 不阻塞
	select {
	case <-r.interrupt:
		return true
	default:
		return false
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main
import (
	"time"
	"os"
	"log"

	"github.com/addozhang/learning-go-lang/runner"
)

func main(){
	log.Println("Starting working.")

	const timeout = 3 * time.Second
	r := runner.New(timeout)

	r.AddTask(createTask(), createTask(), createTask())

	if err := r.Start(); err != nil {
		switch err {
		case runner.ErrTimeout:
			log.Println("Terminating due to timeout.")
			os.Exit(1)
		case runner.ErrInterrupt:
			log.Println("Terminating due to interrupt.")
			os.Exit(2)
		}
	}
	log.Println("Process ended.")
}
//创建任务, 返回接受int类型参数的函数
func createTask() func(int){
	return func(id int) {
		log.Printf("Processor - Task #%d", id)
		time.Sleep(time.Duration(id) * time.Second)
	}
}
//创建任务, 返回接受int类型参数的函数
func createTask() func(int){
	return func(id int) {
		log.Printf("Processor - Task #%d", id)
		time.Sleep(time.Duration(id) * time.Second)
	}
}

//结果输出
//2018/01/01 09:45:57 Starting working.
//2018/01/01 09:45:57 Processor - Task #0
//2018/01/01 09:45:57 Processor - Task #1
//2018/01/01 09:45:58 Processor - Task #2
//2018/01/01 09:46:00 Terminating due to timeout.

pool

下面的代码展示如何使用有缓冲通道实现资源池, 以1.5版本为基础写的. 1.6之后的版本, 标准库中自带了资源池的实现sycn.Pool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package pool

import (
	"sync"
	"io"
	"errors"
	"log"
)

type Pool struct {
	m         sync.Mutex                //互斥锁用于安全地方访问资源池
	resources chan io.Closer            //资源池通道, 需要实现io.Closer接口
	factory   func() (io.Closer, error) //创建资源的工厂方法
	closed    bool                      //资源池是否关闭
}

//资源池关闭错误
var ErrPoolClosed = errors.New("Pool has ben closed.")

func New(fn func() (io.Closer, error), size int) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("Size value too small.")
	}
	return &Pool{
		resources: make(chan io.Closer, size), //使用有缓冲资源池
		factory:   fn,
	}, nil
}

//从池中获取资源
func (p *Pool) Acquire() (io.Closer, error) {
	select {
	case res, ok := <-p.resources: //从资源池通道获取一个资源, 因为有default, 不阻塞
		log.Println("Acqure: ", "Shared Resources")
		if !ok {
			return nil, ErrPoolClosed
		}
		return res, nil

	default: //资源池通道没有数据时, 新建一个
		log.Println("Acquire: ", "New Resource")
		return p.factory()
	}
}

//释放资源
func (p *Pool) Release(res io.Closer) {
	p.m.Lock() //需要使用互斥锁操作资源池
	defer p.m.Unlock()

	if p.closed { //
		res.Close()
		return
	}

	select {
	case p.resources <- res: //将资源放回通道. 如果通道满不会阻塞, 因为有default
		log.Println("Release: ", "In Queue")

	default: //如果通道已满, 直接关闭资源
		log.Println("Release: ", "Closing")
		res.Close()
	}
}

//关闭资源池
func (p *Pool) Close() {
	p.m.Lock() //加互斥锁
	defer p.m.Unlock()

	if p.closed {
		return
	}

	//将池关闭
	p.closed = true

	//在清空通道资源之前关闭通道, 如果不关闭会发声死锁
	close(p.resources)

	for res := range p.resources {
		res.Close() //关闭通道中的资源
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
	"github.com/addozhang/learning-go-lang/pool"
	"log"
	"io"
	"sync/atomic"
	"sync"
	"time"
	"math/rand"
)

const (
	maxGoRoutines   = 25
	pooledResources = 2
)

type dbConnection struct {
	ID int32
}

var idCounter int32

func (dbConn *dbConnection) Close() error {
	log.Println("Close: Connection, ", dbConn.ID)
	return nil
}

func createConnection() (io.Closer, error) {
	id := atomic.AddInt32(&idCounter, 1)
	log.Println("Create: New Connection", id)
	return &dbConnection{id}, nil
}

func main() {
	var wg sync.WaitGroup
	wg.Add(maxGoRoutines)

	p, err := pool.New(createConnection, pooledResources)
	if err != nil {
		log.Println(err)
	}
	for query := 0; query < maxGoRoutines; query++ {
		go func(q int) {
			performQuery(q, p)
			wg.Done()
		}(query)
	}

	wg.Wait()
	log.Println("Shutdown Program.")
	p.Close()
}
func performQuery(query int, pool *pool.Pool) {

	dbConn, err := pool.Acquire()
	if err != nil {
		log.Println(err)
	}

	defer dbConn.Close()
	time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
	log.Printf("QID[%d] CID[%d]", query, dbConn.(*dbConnection).ID)
}

work

下面的代码展示了如何使用无缓冲通道来创建一个goroutine池. 这个goroutine执行并控制一组工作, 让其并发执行.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package worker

import "sync"

type Worker interface {
	Task()
}

type Pool struct {
	worker chan Worker
	wg     sync.WaitGroup
}

func New(maxRoutines int) *Pool {
	p := Pool{
		worker: make(chan Worker),
	}

	p.wg.Add(maxRoutines)

	for i := 0; i < maxRoutines; i++ {
		go func() {
			for w := range p.worker {
				w.Task()
			}
			p.wg.Done()
		}()
	}

	return &p;
}

func (p *Pool) Run(w Worker) {
	p.worker <- w
}

func (p *Pool) Shutdown() {
	close(p.worker)
	p.wg.Wait()
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
	"log"
	"time"
	worker2 "github.com/addozhang/learning-go-lang/worker"
	"sync"
)

var names = []string{
	"bob",
	"steve",
	"mary",
	"therese",
	"json",
}

type namePrinter struct {
	name string
}

func (np *namePrinter) Task() {
	log.Print(np.name)
	time.Sleep(time.Second / 10)
}

func main() {
	p := worker2.New(2)

	var wg sync.WaitGroup
	wg.Add(100 * len(names))

	for i := 0; i < 100; i++ {
		for _, name := range names{
			np := namePrinter{name}
			go func() {
				p.Run(&np)
				wg.Done()
			}()
		}
	}
	wg.Wait()
	p.Shutdown()
}

Comments

comments powered by Disqus