导致TPL Dataflow管道死锁的异常

问题描述

我使用BufferBlockTransformBlockActionBlock创建了DataFlow管道。由于TransformBlock中的异常,该应用程序将死锁。我正在使用BoundedCapacity限制数据。

我的代码是这样的:

public async Task PerformOperation()
{
    var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 });
    var fetchApiResponse = new TransformBlock<ObjA,ObjA>((item) => {
        //Call an api to fetch result.
        //Here for some data i get exception
        return ObjA;
    },new ExecutionDataflowBlockOptions { BoundedCapacity = 2,MaxDegreeOfParallelism = 2,CancellationToken = cancellationToken });
    var finalBlock = new ActionBlock<ObjA>((item) => {
        if (item != null)
        {
            SavetoDB(item);
        }
    },new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1,BoundedCapacity = 1,CancellationToken = cancellationToken });

    bufferBlock.LinkTo(fetchApiResponse,new DataflowLinkOptions { PropagateCompletion = true });
    fetchApiResponse.LinkTo(finalBlock,new DataflowLinkOptions { PropagateCompletion = true });

    await FetchData(bufferBlock);
    bufferBlock.Complete();
    await Task.WhenAll(fetchApiResponse.Completion,finalBlock.Completion);
}
public async Task FetchData(bufferBlock)
{
    List<ObjA> dataToProcessList = GetFromDB();
    foreach (var item in dataToProcessList)
    {
        await bufferBlock.SendAsync(item);
    }
}

如果fetchApiResponse块中出现异常,则数据不会移动并且会导致死锁。 如何处理该管道中的异常?
这里大约有200,000条记录被推送到bufferBlock。
在不引起死锁的情况下处理异常的最佳方法是什么?

更新1: 还添加FetchData方法

谢谢 比尼尔

解决方法

这些块应该 not 允许未处理的异常,而不是试图弄清什么是错误的。这是一种非常常见的模式,也用于[Go管道] 9https://blog.golang.org/pipelines)

文章Exception Handling in TPL Dataflow Networks解释了如何处理异常。

  • 抛出未处理的异常时,仅在所有并发操作完成后,块才进入故障状态。
  • 该状态将传播到将PropagateCompletion设置为true的链接块。但这并不意味着下游块会立即发生故障。

等待错误的块引发。该行:

await Task.WhenAll(fetchApiResponse.Completion,finalBlock.Completion);

应该扔掉,除非那些块还很忙。

解决方案-不允许未处理的异常

相反,返回一个Result对象。当必须进行例如1000个HTTP调用时,让一个异常阻止其他900个调用将是一个坏主意。这与Railroad-oriented programming大致相似。数据流管道与功能管道非常相似。

每个块应返回一个Result<T>类,该类包装实际结果,并以某种方式指示成功或失败。异常处理块应捕获任何异常并返回错误的Result<T>项目。 LinkTo方法可以具有一个谓词,该谓词允许将失败的结果重定向到例如日志记录块或NullBlock。

假设我们有一个简单的Result<T>

class Result<T>
{
    public T Value{get;}
    public Exception Exception{get;}
    public bool Ok {get;}

    public Result(){}

    public Result(T value)
    {
        Value=value;
        Ok=true;
    }

    public Result(Exception exc)
    {
        Exception=exc;
        Ok=false;
    }
}

fetchApiResponse可能是:

    var fetchApiResponse = new TransformBlock<TA,Result<TA>>((item) => {
        try
        {
            ...
            return new Result(ObjA,true);
        }
        catch(Exception exc)
        {
            return new Result(exc);
        }
    },new ExecutionDataflowBlockOptions { BoundedCapacity = 2,MaxDegreeOfParallelism = 2,CancellationToken = cancellationToken });

LinkTo代码可能是:

var propagate=new DataflowLinkOptions { PropagateCompletion = true };

var nullBlock=DataflowBlock.NullTarget<Result<TA>>();
fetchApiResponse.Linkto(logger,propagage,msg=>!msg.Ok);
fetchApiResponse.LinkTo(finalBlock,propagate,msg=>msg.Ok);

在这种情况下,错误消息将被简单地转储为空块。

没有理由使用另一个缓冲区块或等待所有块。 TransformBlock和ActionBlock都有一个由ExecutionDataflowBlockOptions选项控制的输入缓冲区。

发布消息并等待完成可以是:

await FetchData(fetchApiResponse);
fetchApiResponse.Complete();
await finalBlock.Completion;

如果finalBlock在没有有效结果的情况下返回空的fetchApiResponse对象,也可以删除Result中的空检查。

更复杂的Result对象可以处理更复杂的场景。

突然终止

即使管道需要立即终止,也不应该存在任何未处理的异常。故障可能会向下游传播,但不会影响上游块。他们会将消息保存在内存中,即使管道的其余部分中断了,也将继续接受输入。

那肯定看起来像是一个僵局。

解决方案是使用CancellationTokenSource,将其令牌传递给所有块,并在需要终止管道的情况下发出信号。

这是常见的做法,例如在Go中,正是出于这个原因,使用了诸如CancellationTokenSource之类的通道,并取消了下游上游块。 Go Concurrency Patterns: Pipelines and cancellation

中对此进行了描述

如果块确定没有理由继续工作,而不仅仅是在发生错误的情况下,提前取消很有用。在这种情况下,它可以单独使用CancellationTokenSource停止上游块

,

我无法浏览@Panagiotis Kanavos的帖子。同时,我已经更新了这样的代码以根据注释处理异常。

public async Task PerformOperation()
{
  try
   {
    var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 
    });
    var fetchApiResponse = new TransformBlock<ObjA,ObjA>((item) => {
        //Call an api to fetch result.
        //Here for some data i get exception
        try
        {
          int apiResult = await apiCall();
        }
        catch(Exception ex)
        {
         **var dataflowBlock = (IDataflowBlock)bufferBlock;
          dataflowBlock.Fault(ex);
          throw ex;**
        }
        return ObjA;
    },CancellationToken = cancellationToken });
    var finalBlock = new ActionBlock<ObjA>((item) => {
        if (item != null)
        {
            SaveToDB(item);
        }
    },new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1,BoundedCapacity = 1,CancellationToken = cancellationToken });

    bufferBlock.LinkTo(fetchApiResponse,new DataflowLinkOptions { PropagateCompletion = true });
    fetchApiResponse.LinkTo(finalBlock,new DataflowLinkOptions { PropagateCompletion = true });

    await FetchData(bufferBlock);
    bufferBlock.Complete();
    await Task.WhenAll(fetchApiResponse.Completion,finalBlock.Completion);
  }
  catch(AggregateException aex)
  {   //logging the exceptions in aex  }
  catch(Exception ex)
  { //logging the exception}
}
public async Task FetchData(bufferBlock)
{
    List<ObjA> dataToProcessList = GetFromDB();
    foreach (var item in dataToProcessList)
    {
        if(!await bufferBlock.SendAsync(item))
        {
          break; //breaking the loop to stop pushing data.
        }
    }
}

这现在将停止管道并且不会陷入僵局。由于我要处理大量数据,因此我计划为异常添加一个计数器,如果计数器超出一定限制,则仅我将停止管道。如果网络出现小故障,则一个api调用失败,则可能适用于下一个数据。

我将遍历新文章并更新代码以使情况变得更好。 请提供输入。

谢谢 比尼尔