问题描述
我使用BufferBlock
,TransformBlock
和ActionBlock
创建了DataFlow管道。由于TransformBlock
中的异常,该应用程序将死锁。我正在使用BoundedCapacity
限制数据。
我的代码是这样的:
public async Task PerformOperation()
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 });
var fetchApiResponse = new TransformBlock<ObjA,ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
return ObjA;
},new ExecutionDataflowBlockOptions { BoundedCapacity = 2,MaxDegreeOfParallelism = 2,CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SavetoDB(item);
}
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1,BoundedCapacity = 1,CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse,new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock,new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion,finalBlock.Completion);
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
await bufferBlock.SendAsync(item);
}
}
如果fetchApiResponse
块中出现异常,则数据不会移动并且会导致死锁。
如何处理该管道中的异常?
这里大约有200,000条记录被推送到bufferBlock。
在不引起死锁的情况下处理异常的最佳方法是什么?
谢谢 比尼尔
解决方法
这些块应该 not 允许未处理的异常,而不是试图弄清什么是错误的。这是一种非常常见的模式,也用于[Go管道] 9https://blog.golang.org/pipelines)
文章Exception Handling in TPL Dataflow Networks解释了如何处理异常。
- 抛出未处理的异常时,仅在所有并发操作完成后,块才进入故障状态。
- 该状态将传播到将
PropagateCompletion
设置为true
的链接块。但这并不意味着下游块会立即发生故障。
等待错误的块引发。该行:
await Task.WhenAll(fetchApiResponse.Completion,finalBlock.Completion);
应该扔掉,除非那些块还很忙。
解决方案-不允许未处理的异常
相反,返回一个Result
对象。当必须进行例如1000个HTTP调用时,让一个异常阻止其他900个调用将是一个坏主意。这与Railroad-oriented programming大致相似。数据流管道与功能管道非常相似。
每个块应返回一个Result<T>
类,该类包装实际结果,并以某种方式指示成功或失败。异常处理块应捕获任何异常并返回错误的Result<T>
项目。 LinkTo
方法可以具有一个谓词,该谓词允许将失败的结果重定向到例如日志记录块或NullBlock。
假设我们有一个简单的Result<T>
:
class Result<T>
{
public T Value{get;}
public Exception Exception{get;}
public bool Ok {get;}
public Result(){}
public Result(T value)
{
Value=value;
Ok=true;
}
public Result(Exception exc)
{
Exception=exc;
Ok=false;
}
}
fetchApiResponse
可能是:
var fetchApiResponse = new TransformBlock<TA,Result<TA>>((item) => {
try
{
...
return new Result(ObjA,true);
}
catch(Exception exc)
{
return new Result(exc);
}
},new ExecutionDataflowBlockOptions { BoundedCapacity = 2,MaxDegreeOfParallelism = 2,CancellationToken = cancellationToken });
和LinkTo
代码可能是:
var propagate=new DataflowLinkOptions { PropagateCompletion = true };
var nullBlock=DataflowBlock.NullTarget<Result<TA>>();
fetchApiResponse.Linkto(logger,propagage,msg=>!msg.Ok);
fetchApiResponse.LinkTo(finalBlock,propagate,msg=>msg.Ok);
在这种情况下,错误消息将被简单地转储为空块。
没有理由使用另一个缓冲区块或等待所有块。 TransformBlock和ActionBlock都有一个由ExecutionDataflowBlockOptions
选项控制的输入缓冲区。
发布消息并等待完成可以是:
await FetchData(fetchApiResponse);
fetchApiResponse.Complete();
await finalBlock.Completion;
如果finalBlock
在没有有效结果的情况下返回空的fetchApiResponse
对象,也可以删除Result
中的空检查。
更复杂的Result
对象可以处理更复杂的场景。
突然终止
即使管道需要立即终止,也不应该存在任何未处理的异常。故障可能会向下游传播,但不会影响上游块。他们会将消息保存在内存中,即使管道的其余部分中断了,也将继续接受输入。
那肯定看起来像是一个僵局。
解决方案是使用CancellationTokenSource
,将其令牌传递给所有块,并在需要终止管道的情况下发出信号。
这是常见的做法,例如在Go中,正是出于这个原因,使用了诸如CancellationTokenSource之类的通道,并取消了下游和上游块。 Go Concurrency Patterns: Pipelines and cancellation
中对此进行了描述如果块确定没有理由继续工作,而不仅仅是在发生错误的情况下,提前取消很有用。在这种情况下,它可以单独使用CancellationTokenSource停止上游块
,我无法浏览@Panagiotis Kanavos的帖子。同时,我已经更新了这样的代码以根据注释处理异常。
public async Task PerformOperation()
{
try
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1
});
var fetchApiResponse = new TransformBlock<ObjA,ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
try
{
int apiResult = await apiCall();
}
catch(Exception ex)
{
**var dataflowBlock = (IDataflowBlock)bufferBlock;
dataflowBlock.Fault(ex);
throw ex;**
}
return ObjA;
},CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SaveToDB(item);
}
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1,BoundedCapacity = 1,CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse,new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock,new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion,finalBlock.Completion);
}
catch(AggregateException aex)
{ //logging the exceptions in aex }
catch(Exception ex)
{ //logging the exception}
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
if(!await bufferBlock.SendAsync(item))
{
break; //breaking the loop to stop pushing data.
}
}
}
这现在将停止管道并且不会陷入僵局。由于我要处理大量数据,因此我计划为异常添加一个计数器,如果计数器超出一定限制,则仅我将停止管道。如果网络出现小故障,则一个api调用失败,则可能适用于下一个数据。
我将遍历新文章并更新代码以使情况变得更好。 请提供输入。
谢谢 比尼尔