问题描述
我对golang及其并发原则还很陌生。我的用例涉及对一批实体执行多个http请求(针对单个实体)。如果对实体的任何http请求均失败,则我需要停止对其的所有并行http请求。另外,我还必须管理因错误而失败的实体的数量。我正在尝试在实体goroutine中实现错误组,以便如果单个实体的任何http请求失败,则错误组将终止并将错误返回给其父goroutine。但是我不确定如何维护错误计数。
func main(entity[] string) {
errorC := make(chan string) // channel to insert failed entity
var wg sync.WaitGroup
for _,link := range entity {
wg.Add(1)
// Spawn errorgroup here. errorgroup_spawn
}
go func() {
wg.Wait()
close(errorC)
}()
for msg := range errorC {
// here storing error entityIds somewhere.
}
}
和这样的错误组
func errorgroup_spawn(ctx context.Context,errorC chan string,wg *sync.WaitGroup) { // and other params
defer (*wg).Done()
goRoutineCollection,ctxx := errgroup.WithContext(ctx)
results := make(chan *result)
goRoutineCollection.Go(func() error {
// http calls for single entity
// if error occurs,push it in errorC,and return Error.
return nil
})
go func() {
goRoutineCollection.Wait()
close(result)
}()
return goRoutineCollection.Wait()
}
PS:我还考虑过应用嵌套的错误组,但无法考虑在运行其他错误组时维护错误计数 谁能指导我,这是处理这种现实情况的正确方法吗?
解决方法
跟踪错误的一种方法是使用状态结构来跟踪哪个错误来自何处:
type Status struct {
Entity string
Err error
}
...
errorC := make(chan Status)
// Spawn error groups with name of the entity,and when error happens,push Status{Entity:entityName,Err:err} to the chanel
然后您可以从错误通道中读取所有错误,并找出导致失败的原因。
另一个选择是根本不使用错误组。这使事情更加明确,但是是否更好还是有争议的:
// Keep entity statuses
statuses:=make([]Status,len(entity))
for i,link := range entity {
statuses[i].Entity=link
wg.Add(1)
go func(i index) {
defer wg.Done()
ctx,cancel:=context.WithCancel(context.Background())
defer cancel()
// Error collector
status:=make(chan error)
defer close(status)
go func() {
for st:=range status {
if st!=nil {
cancel() // Stop all calls
// store first error
if statuses[i].Err==nil {
statuses[i].Err=st
}
}
}
}()
innerWg:=sync.WaitGroup{}
innerWg.Add(1)
go func() {
defer innerWg.Done()
status<- makeHttpCall(ctx)
}()
innerWg.Add(1)
go func() {
defer innerWg.Done()
status<- makeHttpCall(ctx)
}()
...
innerWg.Wait()
}(i)
}
完成所有操作后,statuses
将包含所有实体和相应的状态。