数据块中的异常处理

问题描述

我试图了解TPL中的异常处理。

以下代码似乎吞没了异常:

var processor = new ActionBlock<int>((id) => SomeAction(id),new ExecutionDataflowBlockOptions { ... });


async Task SomeAction(int merchantID)
{
    //Exception producing code
    ...
}

收听TaskScheduler.UnobservedTaskException事件也不会收到任何消息。

那么,这是否意味着动作块在运行动作时本身会进行try-catch?

在某个地方有任何官方文件吗?

解决方法

更新

DataFlow块的异常处理行为在Exception Handling in TPL DataFlow Networks

中进行了说明

**原图

此代码不会吞噬异常。如果您等待该块以await processor.Completion完成,则会得到异常。如果在调用Complete()之前使用循环将消息泵送到块,则也需要一种方法来停止循环。一种方法是使用CancellationTokenSource并在出现异常的情况下发出信号:

void SomeAction(int i,CancellationTokenSource cts)
{
    try
    {
        ...
    }
    catch(Exception exc)
    {
        //Log the error then
        cts.Cancel();
       //Optionally throw
    }
}

发布代码不必进行太多更改,只需要检查是否

var cts=new CancellationTokenSource();
var token=cts.Token;
var dopOptions=new new ExecutionDataflowBlockOptions { 
                           MaxDegreeOfParallelism=10,CancellationToken=token
};
var block= new ActioBlock<int>(i=>SomeAction(i,cts),dopOptions);

while(!token.IsCancellationRequested && someCondition)
{
    block.Post(...);
}
block.Complete();
await block.Completion;

引发动作时,会发出令牌信号,并且该块结束。如果该操作将异常抛出,则await block.Completion也将异常抛出。

如果这看起来有些令人费解,那是因为这在某种程度上是块的边缘情况。 DataFlow用于创建块的管道或网络。

一般情况

名称Dataflow有意义。 您不必使用相互调用的方法来构建程序,而是要具有处理模块,这些模块可以相互传递消息。没有父方法可以接收结果和异常。块的流水线保持活动状态,以无限期地接收和处理消息,直到某个外部控制器指示其停止为止,例如,通过在头块上调用Complete或发信号通知将CancellationToken传递给每个块。

即使是独立的ActionBlock,块也不应该允许发生未处理的异常。如您所见,除非您已经调用Complete()await Completion,否则您将不会获得异常。

当块内发生未处理的异常时,该块将进入故障状态。该状态传播到与PropagateCompletion选项链接的所有下游块。 上游块不受影响,这意味着它们可以继续工作,将消息存储在其输出缓冲区中,直到该进程用尽内存为止,否则将死锁,因为它没有收到来自这些块的响应。

正确的故障处理

该块应根据应用程序的逻辑捕获异常并决定如何处理:

  1. 记录并继续处理。这与Web应用程序的工作方式没有什么不同-请求期间的异常不会使服务器停机。
  2. 将错误消息显式发送到另一个块。可以,但是这种类型的硬编码不是很像数据流。
  3. 使用带有某种错误指示符的消息类型。也许是一个Success标志,也许是一个包含消息或错误的Envelope<TMessage>对象。
  4. 通过发信号通知所有块以发出信号,以发信号取消CancellationTokenSource来产生所有块所使用的CancellationToken,从而取消整个流水线。这相当于普通程序中的throw

#3是最通用的选项。下游块可以检查信封并忽略或传播失败的消息,而无需进行处理。本质上,失败的消息会绕过下游块。

enter image description here

另一种选择是使用predicate中的LinkTo参数,并将失败的消息发送到记录器块,并将成功的消息发送到下一个下游块。在复杂的情况下,这可以用于例如重试某些操作并将结果发送到下游。

这些概念和图像来自Scott Wlaschin的Railway Oriented Programming

,

TaskScheduler.UnobservedTaskException事件不是处理故障任务异常的可靠/确定性方法,因为它被延迟直到垃圾收集器清除了故障任务。这可能在错误发生后很长时间发生。

数据流块吞没的唯一异常类型是OperationCanceledException(出于未记录的原因,AFAIK)。所有其他异常都会导致块过渡到故障状态。有故障的块也有其Completion属性(这是Task)也有故障(processor.Completion.IsFaulted == true)。您可以在Completion属性上附加一个延续,以在块失败时接收通知。例如,您可以通过简单地使进程崩溃来确保异常不会被忽略:

processor.Completion.ContinueWith(t =>
{
    ThreadPool.QueueUserWorkItem(_ => throw t.Exception);
},default,TaskContinuationOptions.OnlyOnFaulted,TaskScheduler.Default);

之所以可行,是因为在ThreadPool上引发未处理的异常会导致应用程序终止(引发AppDomain.CurrentDomain.UnhandledException事件之后)。

如果您的应用程序具有GUI(WinForms / WPF等),则可以在UI线程上引发异常,从而可以更优雅地处理错误:

var uiContext = SynchronizationContext.Current;
processor.Completion.ContinueWith(t =>
{
    uiContext.Post(_ => throw t.Exception,null);
},TaskScheduler.Default);

这将引发WinForms中的Application.ThreadException事件。