问题描述
我遇到了TPL ActionBlock
,它对于并行处理(异步)并行操作似乎非常方便。到目前为止,我正在使用Task.WhenAll()
(+ Semaphore
用于节流)。在例外方面,似乎有很大的不同:
var successList = new List<int>();
var FailedList = new List<int>();
try
{
var actionBlock = new ActionBlock<int>(
async x => await Task.Run(() =>
{
if (x < 5)
{
FailedList.Add(x);
throw new Exception(x.ToString());
}
successList.Add(x);
}),new ExecutionDataflowBlockOptions());
Enumerable.Range(1,10).Each(x => actionBlock.Post(x));
actionBlock.Complete();
await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
// works for approach using task.whenall
Console.WriteLine(ex);
Assert.True(FailedList.Count == 4);
Assert.True(successList.Count == 6);
return;
}
Assert.Fail();
该测试失败,因为ActionBlock
会在发生异常时立即停止。
我发现这是github上的问题:Dataflow: Add options for Task Faulting。显然,这种行为是不可配置的。
Task.WhenAll()
与这样的扩展方法结合:
public static async Task PreserveAllExceptions(this Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch
{
throw task.Exception;
}
}
将所有(!)异常包装在AggregateException
中,但继续处理:
await Task.WhenAll(task1,task2).PreserveAllExceptions().ConfigureAwait(false);
是否有方便的方法来使用ActionBlock
来做到这一点?
更新: 要澄清:
- 我不打算使用sempahore进行节流(为什么?),因为
ExecutionDataflowBlockOptions
中已经有这样的选择 - 代码片段只是用来演示“问题”的虚拟对象;
Task.Run()
仅用作实际异步功能的占位符。 - 我真正想要做的是:
以并行方式处理所有消息。
错误时不要取消进一步的消息处理。
处理完所有消息后,返回并指示至少发生了错误,然后返回所有错误->带有AggregateException的
Task.WhenAll()
的工作原理。 我知道我可以try{}catch{}
内的ActionBlock
并以某种方式存储异常,但是我想知道是否有任何配置可能性可以简化此操作。 无论如何,仅在我使用ActionBlock
的任何地方使用try catch并收集异常并没什么大不了的。我只是发现Task.WhenAll()
+PreserveException
扩展名对我而言更干净。
解决方法
不清楚问题要问什么。不过,很明显,ActionBlock被滥用了。不需要Task.Run
,因为ActionBlock已经使用了一个或多个辅助任务。不需要信号灯,因为ActionBlock(和其他块)已经通过限制辅助任务和输入队列的数量来支持限制。
代码似乎也试图将异常用作控制流机制,这在过程代码中也是错误的。
异常并不意味着可以逃避该块。 Dataflow is a completely different computing paradigm来自通用过程范式-没有相互调用的函数,因此没有 caller 来接收和处理异常。
在数据流中,块在单个方向上相互传递消息。块组合在管道或网络中,接收消息,对其进行处理,然后将其传递给任何连接的块。如果发生异常,则没有“调用者”可以接收该异常。未处理的异常是灾难性的,它会破坏整个管道-不仅是单个块,而且与PropagateCompletion
设置为true时所链接的任何下游块有关。上游区块永远不会知道这一点,从而导致意外情况。
节流
使用ActionBlock进行限制很容易-对于初学者来说,所有块都仅使用一个工作者任务。可以通过限制上游呼叫者的输入缓冲区并使用await block.SendAsync()
而不是block.Post
来限制上游呼叫者。不需要Task.Run
,因为该块已经使用了工作任务:
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism=2,BoundedCapacity=2
};
var block =new ActionBlock<Message>(processMessage,options);
...
async Task processMessage(Message msg) { ...}
这足以只允许两个并发操作,并且如果已经有两个消息等待,则停止发布者。如果缓冲区已满,则以下代码中的SendAsync
将等待直到插槽可用:
foreach(var msg in someInputCollection)
{
await block.SendAsync(msg);
}
就是这样。该块将同时处理2条消息(默认值为1),并且一次仅在其输入缓冲区中接受2条消息。如果缓冲区已满,则发布循环将等待。
可以通过在处理方法中添加延迟来实现快速和肮脏速率限制:
var block =new ActionBlock<Message>(msg=>{
await Task.Delay(200);
await processMessage(msg);
},options);
有条件的路由
问题的代码似乎正在使用异常来实现控制流。在任何库或范例中这都是错误的。由于数据流在网络中工作,因此控制流的等效条件路由。
通过LinkTo重载也可以使用此重载,该重载接受一个predicate
参数,该参数决定是否应沿着特定的链接传递消息。
在问题的情况下,假设有一个上游TransformBlock
会产生整数,LinkTo
可用于将消息路由到不同的BufferBlocks:
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
var block=new TransformBlock<Message,int>(...);
//Success if x>=5
block.LinkTo(success,x=>x>=5);
//By default,everything else goes to Failure
block.LinkTo(failure);
就是这样。唯一的“技巧”是谓词应涵盖所有选项,否则消息将停留在block
的输出缓冲区中。在所有其他字符之后使用default
链接有助于确保不会遗漏任何消息。
错误处理
块不应该允许异常逃逸。有几种错误处理策略取决于应用程序想要做什么。
处理并记录
一种选择是处理它们并将它们记录到位,就像处理Web应用程序中的错误一样:
var block = new ActionBlock(msg => { 尝试 { 等待processMessage(msg); } 捕获(异常除外) { _logger.LogError(exc,....); } },options);
发布到另一个街区
另一种可能性是将异常以及有关传入消息的信息直接发布到另一个块。该块可能会记录错误并发送消息,或在延迟后重试。在该块后面可能有一条不同的管道,用于在将消息发送到死信缓冲区之前,以增加的延迟重试消息,这与对消息队列的处理类似:
var block =new ActionBlock<Message>(msg=>{
try
{
await processMessage(msg);
}
catch(SomeRetriableException exc)
{
_retryBlock.Post(new RetryMsg(msg,exc));
}
catch(Exception exc)
{
_logger.LogError(exc,....);
}
},options);
使用的策略取决于应用程序执行的操作。如果将ActionBlock用作简单的后台工作人员,则只需登录即可。
包裹和路线
在更高级的方案中,可以将消息包装在Envelope<>
中,该消息可以携带消息以及可能的任何异常。路由可用于将成功与失败消息分开:
class Envelope<T>
{
public T Message{get;}
public Exception Error {get;}
public Envelope (T msg)
{
Message=msg;
}
public Envelope(T msg,Exception err)
{
Message=msg;
Error=err;
}
}
该块现在返回一个信封:
var block=new TransformBlock<Envelope<Message>,Envelope<int>>(env=>{
try
{
var msg=env.Message;
....
return new Envelope(6);
}
catch(Exception exc)
{
return new Envelope(msg,exc);
}
});
这允许使用条件路由将错误路由到errorBlock
:
var errorBlock = ActionBlock<Envelope<Message>>(...);
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
//Send errors to `errorBlock`
block.LinkTo(errorBlock,env=>env.Error!=null);
//Success if x>=5
block.LinkTo(success,x=>x.Message>=5);
//By default,everything else goes to Failure
block.LinkTo(failure);
,
没有简单的方法来聚合所有异常并通过Completion
的{{1}}属性传播它们。不幸的是,TPL Dataflow组件不容易扩展。如果您确实愿意,您可以做到这一点,方法是将ActionBlock
封装在array_flip
内并自定义此块的custom block。例如:
ActionBlock
...但是这个简单的代码不会传播取消,也不会传播通过public class MyActionBlock<TInput> : ITargetBlock<TInput>
{
private readonly ActionBlock<TInput> _actionBlock;
private readonly ConcurrentQueue<Exception> _exceptions;
//...
public Task Completion
{
get
{
return _actionBlock.Completion.ContinueWith(t =>
{
if (_exceptions.Count > 0)
throw new AggregateException(_exceptions);
});
}
}
}
接口的Completion
方法传递的任何异常。因此,您必须花费大量的精力才能使其在所有情况下都能正常运行,并且是否值得投资会令人怀疑。