问题描述
我试图了解TPL中的异常处理。
以下代码似乎吞没了异常:
var processor = new ActionBlock<int>((id) => SomeAction(id),new ExecutionDataflowBlockOptions { ... });
async Task SomeAction(int merchantID)
{
//Exception producing code
...
}
收听TaskScheduler.UnobservedTaskException
事件也不会收到任何消息。
那么,这是否意味着动作块在运行动作时本身会进行try-catch?
在某个地方有任何官方文件吗?
解决方法
更新
DataFlow块的异常处理行为在Exception Handling in TPL DataFlow Networks
中进行了说明**原图
此代码不会吞噬异常。如果您等待该块以await processor.Completion
完成,则会得到异常。如果在调用Complete()
之前使用循环将消息泵送到块,则也需要一种方法来停止循环。一种方法是使用CancellationTokenSource
并在出现异常的情况下发出信号:
void SomeAction(int i,CancellationTokenSource cts)
{
try
{
...
}
catch(Exception exc)
{
//Log the error then
cts.Cancel();
//Optionally throw
}
}
发布代码不必进行太多更改,只需要检查是否
var cts=new CancellationTokenSource();
var token=cts.Token;
var dopOptions=new new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism=10,CancellationToken=token
};
var block= new ActioBlock<int>(i=>SomeAction(i,cts),dopOptions);
while(!token.IsCancellationRequested && someCondition)
{
block.Post(...);
}
block.Complete();
await block.Completion;
引发动作时,会发出令牌信号,并且该块结束。如果该操作将异常抛出,则await block.Completion
也将异常抛出。
如果这看起来有些令人费解,那是因为这在某种程度上是块的边缘情况。 DataFlow用于创建块的管道或网络。
一般情况
名称Dataflow有意义。
您不必使用相互调用的方法来构建程序,而是要具有处理模块,这些模块可以相互传递消息。没有父方法可以接收结果和异常。块的流水线保持活动状态,以无限期地接收和处理消息,直到某个外部控制器指示其停止为止,例如,通过在头块上调用Complete
或发信号通知将CancellationToken传递给每个块。
即使是独立的ActionBlock,块也不应该允许发生未处理的异常。如您所见,除非您已经调用Complete()
和await Completion
,否则您将不会获得异常。
当块内发生未处理的异常时,该块将进入故障状态。该状态传播到与PropagateCompletion
选项链接的所有下游块。 上游块不受影响,这意味着它们可以继续工作,将消息存储在其输出缓冲区中,直到该进程用尽内存为止,否则将死锁,因为它没有收到来自这些块的响应。
正确的故障处理
该块应根据应用程序的逻辑捕获异常并决定如何处理:
- 记录并继续处理。这与Web应用程序的工作方式没有什么不同-请求期间的异常不会使服务器停机。
- 将错误消息显式发送到另一个块。可以,但是这种类型的硬编码不是很像数据流。
- 使用带有某种错误指示符的消息类型。也许是一个
Success
标志,也许是一个包含消息或错误的Envelope<TMessage>
对象。 - 通过发信号通知所有块以发出信号,以发信号取消
CancellationTokenSource
来产生所有块所使用的CancellationToken
,从而取消整个流水线。这相当于普通程序中的throw
。
#3是最通用的选项。下游块可以检查信封并忽略或传播失败的消息,而无需进行处理。本质上,失败的消息会绕过下游块。
另一种选择是使用predicate
中的LinkTo
参数,并将失败的消息发送到记录器块,并将成功的消息发送到下一个下游块。在复杂的情况下,这可以用于例如重试某些操作并将结果发送到下游。
这些概念和图像来自Scott Wlaschin的Railway Oriented Programming
, TaskScheduler.UnobservedTaskException
事件不是处理故障任务异常的可靠/确定性方法,因为它被延迟直到垃圾收集器清除了故障任务。这可能在错误发生后很长时间发生。
数据流块吞没的唯一异常类型是OperationCanceledException
(出于未记录的原因,AFAIK)。所有其他异常都会导致块过渡到故障状态。有故障的块也有其Completion
属性(这是Task
)也有故障(processor.Completion.IsFaulted == true
)。您可以在Completion
属性上附加一个延续,以在块失败时接收通知。例如,您可以通过简单地使进程崩溃来确保异常不会被忽略:
processor.Completion.ContinueWith(t =>
{
ThreadPool.QueueUserWorkItem(_ => throw t.Exception);
},default,TaskContinuationOptions.OnlyOnFaulted,TaskScheduler.Default);
之所以可行,是因为在ThreadPool
上引发未处理的异常会导致应用程序终止(引发AppDomain.CurrentDomain.UnhandledException
事件之后)。
如果您的应用程序具有GUI(WinForms / WPF等),则可以在UI线程上引发异常,从而可以更优雅地处理错误:
var uiContext = SynchronizationContext.Current;
processor.Completion.ContinueWith(t =>
{
uiContext.Post(_ => throw t.Exception,null);
},TaskScheduler.Default);
这将引发WinForms中的Application.ThreadException
事件。