如何仅使用BroadcastBlock和ActionBlock使用数据流

问题描述

这是我的第一个问题,我是第一次将DataFlow与broadcastBlock和ActionBlock一起使用,希望在这里得到解决方案。 这是结构。

型号

k*d

DataFlow逻辑

class SampleModel
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public bool Success { get; set; } = true;
    public object UniqueData { get; set; }

    public override string ToString()
    {
        StringBuilder sb = new StringBuilder();
        sb.AppendLine($"Id - {Id}");
        sb.AppendLine($"Success - {Success}");
        sb.AppendLine($"UniqueData - {UniqueData}");
        string tmp = sb.ToString();
        sb.Clear();
        return tmp;
    }
}

呼叫者

class CreateDownloadTask
{
    public async Task VeryLongProcess()
    {
        await Task.Run(async () =>
        {
            Console.WriteLine("Long Process Working..");
            await Task.Delay(TimeSpan.FromSeconds(5));
            Console.WriteLine("Long Process Done..");
        });
    }

    public async Task CreateSimplebroadcastX<T>(T data)
    {
        Action<T> process = async model =>
        {
            Console.WriteLine("Working..");
            await VeryLongProcess();
            Console.WriteLine("Done");
        };


        var broad = new broadcastBlock<T>(null);

        var action = new ActionBlock<T>(process);

        var dflo = new DataflowLinkOptions { PropagateCompletion = true };

        broad.LinkTo(action,dflo);

        await broad.SendAsync(data);

        broad.Complete();

        await action.Completion.ContinueWith(async tsk =>
        {
            Console.WriteLine("Continue data");
        }).ConfigureAwait(false);

        Console.WriteLine("All Done");
    }
}

我希望结果应该是

var task = cdt.CreateSimplebroadcastX<SampleModel>(new SampleModel
{
    UniqueData = cdt.GetHashCode()
});
task.GetAwaiter().GetResult();
Console.WriteLin("Completed");

但是我所拥有的是

Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed

当ther是ActionBlock中的Working.. Long Process Working.. Continue data All Done Completed Long Process Done.. Done 时,就会发生这种情况。 现在的问题是,没有async-await,是否可以按我预期的结果?

那意味着ActionBlock.Completion是否要等到ActionBlock内部的WaitHandleAction完成执行?

或者我在用这种模式做错了吗?

在此先感谢您,我的英语不好。

解决方法

您的问题在这里:

Action<T> process = async model => ...

该代码创建一个should be avoidedasync void方法。您应该避免使用async void的原因之一是因为很难知道该方法何时完成。而这正是正在发生的事情:ActionBlock<T>无法知道您的代表何时完成,因为它是async void

正确的delegate type for an asynchronous method without a return value that takes a single argumentFunc<T,Task>

Func<T,Task> process = async model => ...

现在异步方法返回了TaskActionBlock可以知道它何时完成。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...