是在工作程序循环中使用多工作组的正确方法吗?

问题描述

在这种情况下,我想使用goroutine worker保存数据,然后等待保存数据完成,然后将新函数执行到操作数据,这就是这种情况

var s struct {
  jobs chan Data
}

func allocateJob([] Data) {
    for _,d := range data {
        s.jobs <- d
    }
    close(s.jobs)
}

func Foo() (err error) {
 resultData = GetData()

 s.jobs = make(chan Data,NumOfWorkers)
 go allocateJob(resultData)

 var wg sync.WaitGroup
 for i := 1; i <= NumOfWorkers; i++ {
    wg.Add(1)
    go func() {
        for job := range jobs {
            err = s.saveData(ctx,job) // i want to wait thise till finish  save all data
            wg.Done()
            err = s.ManipulateDataSomething(ctx,job)
            wg.Done()
        }
        wg.Done()
    }()
 }
 wg.Wait()
 return err
}

有可能做到这一点和正确的方法吗? 我对并发性和goroutine非常陌生,希望我问的有道理

解决方法

这是一个非常简单的示例:

package main

import (
    "fmt"
    "sync"
)

type job struct {
    do func()
}

func (j job) Do() {
    if j.do != nil {
        j.do()
    }
}

type workerPool struct {
    workers []worker
    stop chan struct{}
    jobs chan job
}

func newWorkerPool(numWorkers int) *workerPool {
    if numWorkers < 1 {
        numWorkers = 1
    }
    
    // stop denotes a channel to reclaim goroutine spawned by each workers.
    stop := make(chan struct{},1)
    
    // jobs denotes a job queue which able to queue at most 100 jobs.
    jobs := make(chan job,100)
    
    // workers denotes a worker thread for concurrent processing jobs.
    workers := make([]worker,numWorkers)
    for i := range workers {
        workers[i] = worker {
            stop: stop,jobs: jobs,}
    }
    
    return &workerPool {
        workers: workers,stop: stop,}
}

// Start spawns multiple worker routines.
func (wp *workerPool) Start() {
    for i := range wp.workers {
        wp.workers[i].Start()
    }
}

// Stop reclaim goroutine spawned each worker.
func (wp *workerPool) Stop() {
    close(wp.stop)
}

// Do create a job and queue it to a job queue.
func (wp *workerPool) Do(fn func()) {
    wp.jobs <- job{do:fn}
}

type worker struct {
    stop  chan struct{}
    jobs  chan job
}

func (w *worker) Start() {
    go w.start()
}

func (w *worker) start() {
    for {
        select {
        case <-w.stop:
            return
        case job := <-w.jobs:
            job.Do()
        }
    }
}

func main() {

    // Create a worker pool with 4 workers inside.
    wp := newWorkerPool(4)
    
    // Start the workerpool to tell workers prepare to work.
    wp.Start()
    defer wp.Stop()
    
    // Using this wait group to wait until all of say hello jobs are processed.
    var helloWg sync.WaitGroup
    
    // Using this wait group to wait until all of say hi jobs are processed.
    var hiWg sync.WaitGroup
    
    // Define function of saying hello.
    sayHello := func() { 
        defer helloWg.Done()
        fmt.Println("Hello")
    }
    
    // Define function of saying hi.
    sayHi := func() {
        defer hiWg.Done()
        fmt.Println("Hi")
    }
    
    // Let's say hello 5 times.
    for i := 0 ; i < 5 ; i++ {
        helloWg.Add(1)
        wp.Do(sayHello)
    }
    
    // Let's say hi 3 times.
    go func() {
        for i := 0 ; i < 3 ; i++ {
            hiWg.Add(1)
            wp.Do(sayHi)
        }
    }()
    
    // Wait for all say hello jobs.
    helloWg.Wait()
    
    // Wait for all say hi jobs.
    hiWg.Wait()
}

playgound

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...