Go In Action 读书笔记 四
并发模式
runner
runner展示了如何使用通道来监视程序的执行时间, 如果程序执行时间太长, 也可以用终止程序. 这个程序可用作corn作业执行
package runner
import (
"os"
"time"
"errors"
"os/signal"
"log"
)
type Runner struct {
//系统信号通道
interrupt chan os.Signal
//任务执行结果通道
complete chan error
//报告任务处理已经超时
timeout <-chan time.Time
tasks []func(int)
}
//超时错误
var ErrTimeout = errors.New("received timeout")
//系统终端错误
var ErrInterrupt = errors.New("received interrupt")
//返回一个新的准备使用的Runner
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d), //After函数会使用goroutine启动一个timer, timer时间到后向channel写入Time
}
}
//向Runner中添加task
func (r *Runner) AddTask(tasks ... func(int)) {
r.tasks = append(r.tasks, tasks...)
}
func (r *Runner) Start() error {
//希望接收所有终端信号
signal.Notify(r.interrupt, os.Interrupt)
go func() {
//使用goroutine执行任务
r.complete <- r.run()
}()
select { //main线程在select处阻塞, 要么等待任务执行结果结束, 要么等待计时器报告超时
case err := <-r.complete://阻塞等待任务执行结果
return err
case <-r.timeout: //阻塞等待超时报告
return ErrTimeout
}
}
func (r *Runner) run() error {
for id, task := range r.tasks {
//检测是否有来自操作系统的终端信号
if r.getInterrupted() {
return ErrInterrupt
}
//执行任务
task(id)
}
return nil
}
func (r *Runner) getInterrupted() bool {
//使用default将select的阻塞变成非阻塞. 每次方法调用只是检查通道中是否有数据, 不阻塞
select {
case <-r.interrupt:
return true
default:
return false
}
}
package main
import (
"time"
"os"
"log"
"github.com/addozhang/learning-go-lang/runner"
)
func main(){
log.Println("Starting working.")
const timeout = 3 * time.Second
r := runner.New(timeout)
r.AddTask(createTask(), createTask(), createTask())
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
//创建任务, 返回接受int类型参数的函数
func createTask() func(int){
return func(id int) {
log.Printf("Processor - Task #%d", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
//创建任务, 返回接受int类型参数的函数
func createTask() func(int){
return func(id int) {
log.Printf("Processor - Task #%d", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
//结果输出
//2018/01/01 09:45:57 Starting working.
//2018/01/01 09:45:57 Processor - Task #0
//2018/01/01 09:45:57 Processor - Task #1
//2018/01/01 09:45:58 Processor - Task #2
//2018/01/01 09:46:00 Terminating due to timeout.
pool
下面的代码展示如何使用有缓冲通道实现资源池, 以1.5版本为基础写的. 1.6之后的版本, 标准库中自带了资源池的实现sycn.Pool
package pool
import (
"sync"
"io"
"errors"
"log"
)
type Pool struct {
m sync.Mutex //互斥锁用于安全地方访问资源池
resources chan io.Closer //资源池通道, 需要实现io.Closer接口
factory func() (io.Closer, error) //创建资源的工厂方法
closed bool //资源池是否关闭
}
//资源池关闭错误
var ErrPoolClosed = errors.New("Pool has ben closed.")
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Size value too small.")
}
return &Pool{
resources: make(chan io.Closer, size), //使用有缓冲资源池
factory: fn,
}, nil
}
//从池中获取资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
case res, ok := <-p.resources: //从资源池通道获取一个资源, 因为有default, 不阻塞
log.Println("Acqure: ", "Shared Resources")
if !ok {
return nil, ErrPoolClosed
}
return res, nil
default: //资源池通道没有数据时, 新建一个
log.Println("Acquire: ", "New Resource")
return p.factory()
}
}
//释放资源
func (p *Pool) Release(res io.Closer) {
p.m.Lock() //需要使用互斥锁操作资源池
defer p.m.Unlock()
if p.closed { //
res.Close()
return
}
select {
case p.resources <- res: //将资源放回通道. 如果通道满不会阻塞, 因为有default
log.Println("Release: ", "In Queue")
default: //如果通道已满, 直接关闭资源
log.Println("Release: ", "Closing")
res.Close()
}
}
//关闭资源池
func (p *Pool) Close() {
p.m.Lock() //加互斥锁
defer p.m.Unlock()
if p.closed {
return
}
//将池关闭
p.closed = true
//在清空通道资源之前关闭通道, 如果不关闭会发声死锁
close(p.resources)
for res := range p.resources {
res.Close() //关闭通道中的资源
}
}
package main
import (
"github.com/addozhang/learning-go-lang/pool"
"log"
"io"
"sync/atomic"
"sync"
"time"
"math/rand"
)
const (
maxGoRoutines = 25
pooledResources = 2
)
type dbConnection struct {
ID int32
}
var idCounter int32
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection, ", dbConn.ID)
return nil
}
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoRoutines)
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
for query := 0; query < maxGoRoutines; query++ {
go func(q int) {
performQuery(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("Shutdown Program.")
p.Close()
}
func performQuery(query int, pool *pool.Pool) {
dbConn, err := pool.Acquire()
if err != nil {
log.Println(err)
}
defer dbConn.Close()
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]", query, dbConn.(*dbConnection).ID)
}
work
下面的代码展示了如何使用无缓冲通道来创建一个goroutine池. 这个goroutine执行并控制一组工作, 让其并发执行.
package worker
import "sync"
type Worker interface {
Task()
}
type Pool struct {
worker chan Worker
wg sync.WaitGroup
}
func New(maxRoutines int) *Pool {
p := Pool{
worker: make(chan Worker),
}
p.wg.Add(maxRoutines)
for i := 0; i < maxRoutines; i++ {
go func() {
for w := range p.worker {
w.Task()
}
p.wg.Done()
}()
}
return &p;
}
func (p *Pool) Run(w Worker) {
p.worker <- w
}
func (p *Pool) Shutdown() {
close(p.worker)
p.wg.Wait()
}
package main
import (
"log"
"time"
worker2 "github.com/addozhang/learning-go-lang/worker"
"sync"
)
var names = []string{
"bob",
"steve",
"mary",
"therese",
"json",
}
type namePrinter struct {
name string
}
func (np *namePrinter) Task() {
log.Print(np.name)
time.Sleep(time.Second / 10)
}
func main() {
p := worker2.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
for _, name := range names{
np := namePrinter{name}
go func() {
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
p.Shutdown()
}