问题描述
在这种情况下,我想使用goroutine worker保存数据,然后等待保存数据完成,然后将新函数执行到操作数据,这就是这种情况
var s struct {
jobs chan Data
}
func allocateJob([] Data) {
for _,d := range data {
s.jobs <- d
}
close(s.jobs)
}
func Foo() (err error) {
resultData = GetData()
s.jobs = make(chan Data,NumOfWorkers)
go allocateJob(resultData)
var wg sync.WaitGroup
for i := 1; i <= NumOfWorkers; i++ {
wg.Add(1)
go func() {
for job := range jobs {
err = s.saveData(ctx,job) // i want to wait thise till finish save all data
wg.Done()
err = s.ManipulateDataSomething(ctx,job)
wg.Done()
}
wg.Done()
}()
}
wg.Wait()
return err
}
有可能做到这一点和正确的方法吗? 我对并发性和goroutine非常陌生,希望我问的有道理
解决方法
这是一个非常简单的示例:
package main
import (
"fmt"
"sync"
)
type job struct {
do func()
}
func (j job) Do() {
if j.do != nil {
j.do()
}
}
type workerPool struct {
workers []worker
stop chan struct{}
jobs chan job
}
func newWorkerPool(numWorkers int) *workerPool {
if numWorkers < 1 {
numWorkers = 1
}
// stop denotes a channel to reclaim goroutine spawned by each workers.
stop := make(chan struct{},1)
// jobs denotes a job queue which able to queue at most 100 jobs.
jobs := make(chan job,100)
// workers denotes a worker thread for concurrent processing jobs.
workers := make([]worker,numWorkers)
for i := range workers {
workers[i] = worker {
stop: stop,jobs: jobs,}
}
return &workerPool {
workers: workers,stop: stop,}
}
// Start spawns multiple worker routines.
func (wp *workerPool) Start() {
for i := range wp.workers {
wp.workers[i].Start()
}
}
// Stop reclaim goroutine spawned each worker.
func (wp *workerPool) Stop() {
close(wp.stop)
}
// Do create a job and queue it to a job queue.
func (wp *workerPool) Do(fn func()) {
wp.jobs <- job{do:fn}
}
type worker struct {
stop chan struct{}
jobs chan job
}
func (w *worker) Start() {
go w.start()
}
func (w *worker) start() {
for {
select {
case <-w.stop:
return
case job := <-w.jobs:
job.Do()
}
}
}
func main() {
// Create a worker pool with 4 workers inside.
wp := newWorkerPool(4)
// Start the workerpool to tell workers prepare to work.
wp.Start()
defer wp.Stop()
// Using this wait group to wait until all of say hello jobs are processed.
var helloWg sync.WaitGroup
// Using this wait group to wait until all of say hi jobs are processed.
var hiWg sync.WaitGroup
// Define function of saying hello.
sayHello := func() {
defer helloWg.Done()
fmt.Println("Hello")
}
// Define function of saying hi.
sayHi := func() {
defer hiWg.Done()
fmt.Println("Hi")
}
// Let's say hello 5 times.
for i := 0 ; i < 5 ; i++ {
helloWg.Add(1)
wp.Do(sayHello)
}
// Let's say hi 3 times.
go func() {
for i := 0 ; i < 3 ; i++ {
hiWg.Add(1)
wp.Do(sayHi)
}
}()
// Wait for all say hello jobs.
helloWg.Wait()
// Wait for all say hi jobs.
hiWg.Wait()
}