2.7 并发编程
go协程
func main() { //两个交错输出 go sayHello() go sayHello2() time.Sleep(time.Second * 3) //阻塞主线程 } func sayHello() { for i := 0; i < 30; i++ { fmt.Println("hello world") } } func sayHello2() { for i := 0; i < 30; i++ { fmt.Println("你好中国") } }
//通过sync.WaitGroup来等待所有线程完成 package main import ( "fmt" "sync" ) func main() { var w = &sync.WaitGroup{} w.Add(2) go sayEn(w) go sayZh(w) w.Wait() } func sayEn(w *sync.WaitGroup) { for i := 0; i < 30; i++ { fmt.Println("hello world") } w.Done() //每当这个方法完成则减少1 } func sayZh(w *sync.WaitGroup) { for i := 0; i < 30; i++ { fmt.Println("中国你好") } w.Done() //每当这个方法完成则减少1 }
go管道
管道的定义:
//无缓冲管道 flag := make(chan bool) //有缓冲管道 data := make(chan int,10) //向管道中添加值 data <- 10 //从管道中取值 agr := <- data <- data //也可以直接释放值,不用变量接收
1. 通过go实现同步
package main import ( "fmt" ) func main() { w1,w2 := make(chan bool),make(chan bool) go sayEn_chan(w1) go sayZh_chan(w2) <- w1 //阻塞,直到chan 可以取出数据 <- w2 } func sayEn_chan(w chan bool) { for i := 0; i < 30; i++ { fmt.Println("hello world") } w <- true //方法完成写入通道 } func sayZh_chan(w chan bool) { for i := 0; i < 30; i++ { fmt.Println("中国你好") } w <- true }
2. 正确处理累加
package main import ( "fmt" "sync/atomic" ) var ( count int64 ) func main() { w1,make(chan bool) go add(w1) go add(w2) <- w1 //阻塞,直到chan 可以取出数据 <- w2 fmt.Println(count) } func add(w chan bool) { for i := 0; i < 5000; i++ { atomic.AddInt64(&count,1) } w <- true }
3. 通道实现数据共享
package main import ( "fmt" "math/rand" "sync" ) var wg sync.WaitGroup func main() { count := make(chan int) wg.Add(2) go player("张三",count) go player("李四",count) //发球 count <- 1 wg.Wait() //阻塞等待2个线程完成 } func player(name string,count chan int) { defer wg.Done() for { i,ok := <-count if !ok { //通道关闭 fmt.Printf("运动员 %s 赢了\n",name) return } tmp := rand.Intn(100) if tmp % 13 == 0 { //没有接到球 fmt.Printf("运动员 %s 输了\n",name) close(count) return } fmt.Printf("运动员 %s 击球 %d \n",name,i) i ++ count <- i } }
4. 缓冲管道
package main import ( "fmt" "sync" "time" ) var ( numberTasks = 10 workers = 4 ) var wg2 sync.WaitGroup func main() { wg2.Add(workers) tasks := make(chan int,numberTasks) for i := 0; i < workers; i++ { go work(tasks,i) } for j := 1; j <= numberTasks; j++ { tasks <- j } close(tasks) wg2.Wait() } func work(tasks chan int,worker int) { defer wg2.Done() for { task,ok := <- tasks if !ok { fmt.Printf("任务完成,工号:%d\n",worker) return } fmt.Printf("工号:%d,开始工作:%d\n",worker,task) time.Sleep(time.Microsecond * 100) fmt.Printf("工号:%d,完成工作:%d\n",task) } }
5. select
select 的特点是:不会阻塞,哪个管道有值,我取哪个。所以,下面当运行到go的时候,a,b还没有添值,所以只能选择defaul运行,这里可以把
defualt
部分和b<-2
去掉,select会被阻塞,直到a<-1执行
func main() { a := make(chan int) b := make(chan int) go func() { b <- 2 time.Sleep(time.Second * 3) a <- 1 }() select { case <- a: fmt.Println("a") case <- b: fmt.Println("b") time.Sleep(time.Second * 3) default: fmt.Println("hello world") } }
6. runner并发模型
package runner import ( "errors" "os" "os/signal" "time" ) type Runner struct { interrupt chan os.Signal complete chan error timeout <-chan time.Time //声明一个只读的管道 tasks []func(int) } var ErrorTimeout = errors.New("receive timeout") var ErrorInterrupt = errors.New("interrupt error") func New(duration time.Duration) *Runner { return &Runner{ interrupt: make(chan os.Signal,1),complete: make(chan error),timeout: time.After(duration),} } func (r *Runner) Add(tasks...func(int)) { r.tasks = append(r.tasks,tasks...) } func (r *Runner) getInterrupt() bool { select { case <-r.interrupt: signal.Stop(r.interrupt) return true default: return false } } func (r *Runner) run() error { for id,task := range r.tasks { if r.getInterrupt() { return ErrorInterrupt } task(id) } return nil } func (r *Runner) Start() error { signal.Notify(r.interrupt,os.Interrupt) go func() { r.complete <- r.run() }() select { case err := <- r.complete: return err case <- r.timeout: return ErrorTimeout } }
测试
package main import ( "gorounting/runner" "log" "os" "time" ) const ( timeout = 4 * time.Second ) func main() { log.Println("任务开始") ru := runner.New(timeout) ru.Add(createTask(),createTask(),createTask()) if err := ru.Start(); err != nil { switch err { case runner.ErrorInterrupt: log.Println("系统被中断") os.Exit(1) case runner.ErrorTimeout: log.Println("系统超时") os.Exit(2) } } log.Println("程序结束") } func createTask() func(int) { return func(id int) { log.Printf("process-task #%d\n",id) time.Sleep(time.Duration(id) * time.Second ) } }