多对多TPL数据流不会处理所有输入

问题描述

我有一个TPL Datalow管道,其中有两个源和两个目标以多对多方式链接。目标块似乎已成功完成,但是,它通常会删除一个或多个输入。我已经附上了我可能在下面提出的最简单的完整副本。有什么想法吗?

注意:

  1. 仅当在生成输入时使用人为延迟时,才会出现此问题。
  2. 两个源均成功调用了Complete(),但是即使两个目标均成功完成,该源的Completion任务也处于WaitingForActivation状态。
  3. 我找不到任何文档说明不支持多对多数据流,而这个问题的答案暗示它是-https://social.msdn.microsoft.com/Forums/en-US/19d831af-2d3f-4d95-9672-b28ae53e6fa0/completion-of-complex-graph-dataflowgraph-object?forum=tpldataflow
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    private const int NumbersPerSource = 10;
    private const int MaxDelayMilliseconds = 10;

    static async Task Main(string[] args)
    {
        int numbersProcessed = 0;

        var source1 = new BufferBlock<int>();
        var source2 = new BufferBlock<int>();

        var target1 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
        var target2 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));

        var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
        source1.LinkTo(target1,linkOptions);
        source1.LinkTo(target2,linkOptions);
        source2.LinkTo(target1,linkOptions);
        source2.LinkTo(target2,linkOptions);

        var task1 = Task.Run(() => Post(source1));
        var task2 = Task.Run(() => Post(source2));

        // source1 or source2 Completion tasks may never complete even though Complete is always successfully called.
        //await Task.WhenAll(task1,task2,source1.Completion,source2.Completion,target1.Completion,target2.Completion);
        await Task.WhenAll(task1,target2.Completion);

        Console.WriteLine($"{numbersProcessed} of {NumbersPerSource * 2} numbers processed.");
    }

    private static async Task Post(BufferBlock<int> source)
    {
        foreach (var i in Enumerable.Range(0,NumbersPerSource)) {
            await Task.Delay(TimeSpan.FromMilliseconds(GetRandomMilliseconds()));
            Debug.Assert(source.Post(i));
        }
        source.Complete();
    }

    private static Random Random = new Random();

    private static int GetRandomMilliseconds()
    {
        lock (Random) {
            return Random.Next(0,MaxDelayMilliseconds);
        }
    }
}

解决方法

正如@MikeJ在comment中指出的那样,在多对多数据流配置中使用PropagateCompletion链接块可能会导致某些目标块过早完成。在这种情况下,当两个源块中的任何一个完成时,target1target2都标记为已完成,而另一个源则无法完成,因为它的输出缓冲区中仍然有消息。这些消息永远不会被消耗,因为没有一个链接的目标块愿意接受它们。

要解决此问题,您可以使用下面的自定义PropagateCompletion方法:

public static async void PropagateCompletion(IDataflowBlock[] sources,IDataflowBlock[] targets)
{
    // Arguments validation omitted
    Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));

    try { await allSourcesCompletion.ConfigureAwait(false); } catch { }

    var exception = allSourcesCompletion.IsFaulted ?
        allSourcesCompletion.Exception : null;

    foreach (var target in targets)
    {
        if (exception != null) target.Fault(exception); else target.Complete();
    }
}

用法示例:

source1.LinkTo(target1);
source1.LinkTo(target2);
source2.LinkTo(target1);
source2.LinkTo(target2);
PropagateCompletion(new[] { source1,source2 },new[] { target1,target2 });

在此示例中,将源链接到目标时,请注意没有传递DataflowLinkOptions