一堆goroutine中的嵌套errgroup

问题描述

我对golang及其并发原则还很陌生。我的用例涉及对一批实体执行多个http请求(针对单个实体)。如果对实体的任何http请求均失败,则我需要停止对其的所有并行http请求。另外,我还必须管理因错误而失败的实体的数量。我正在尝试在实体goroutine中实现错误组,以便如果单个实体的任何http请求失败,则错误组将终止并将错误返回给其父goroutine。但是我不确定如何维护错误计数。

func main(entity[] string) {
    errorC := make(chan string) // channel to insert failed entity
    var wg sync.WaitGroup

    for _,link := range entity {
        wg.Add(1)
        // Spawn errorgroup here. errorgroup_spawn
    }

    go func() {
        wg.Wait()   
        close(errorC)    
    }()

    for msg := range errorC {
        // here storing error entityIds somewhere.
    }
}

和这样的错误组

func errorgroup_spawn(ctx context.Context,errorC chan string,wg *sync.WaitGroup) { // and other params
    defer (*wg).Done()
    
   goRoutineCollection,ctxx := errgroup.WithContext(ctx)
    results := make(chan *result)
    goRoutineCollection.Go(func() error {
        // http calls for single entity
        // if error occurs,push it in errorC,and return Error.
        return nil
    })

    go func() {
        goRoutineCollection.Wait()
        close(result)
    }()

   return goRoutineCollection.Wait()
}

PS:我还考虑过应用嵌套的错误组,但无法考虑在运行其他错误组时维护错误计数 谁能指导我,这是处理这种现实情况的正确方法吗?

解决方法

跟踪错误的一种方法是使用状态结构来跟踪哪个错误来自何处:

type Status struct {
   Entity string
   Err error
}
...

errorC := make(chan Status) 

// Spawn error groups with name of the entity,and when error happens,push Status{Entity:entityName,Err:err} to the chanel

然后您可以从错误通道中读取所有错误,并找出导致失败的原因。

另一个选择是根本不使用错误组。这使事情更加明确,但是是否更好还是有争议的:

// Keep entity statuses
statuses:=make([]Status,len(entity))
for i,link := range entity {
   statuses[i].Entity=link
   wg.Add(1)
   go func(i index) {
      defer wg.Done()
      ctx,cancel:=context.WithCancel(context.Background())
      defer cancel()

      // Error collector
      status:=make(chan error)
      defer close(status)
      go func() {
         for st:=range status {
             if st!=nil {
                cancel()  // Stop all calls 
                // store first error
                if statuses[i].Err==nil {
                   statuses[i].Err=st
                }
             }
         }
      }()

      innerWg:=sync.WaitGroup{}
      innerWg.Add(1)
      go func() {
         defer innerWg.Done()
         status<- makeHttpCall(ctx)
      }()
      innerWg.Add(1)
      go func() {
         defer innerWg.Done()
         status<- makeHttpCall(ctx)
      }()
      ...
      innerWg.Wait()

   }(i)
}

完成所有操作后,statuses将包含所有实体和相应的状态。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...