问题描述
这是我的第一个问题,我是第一次将DataFlow与broadcastBlock和ActionBlock一起使用,希望在这里得到解决方案。 这是结构。
型号
k*d
DataFlow逻辑
class SampleModel
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public bool Success { get; set; } = true;
public object UniqueData { get; set; }
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine($"Id - {Id}");
sb.AppendLine($"Success - {Success}");
sb.AppendLine($"UniqueData - {UniqueData}");
string tmp = sb.ToString();
sb.Clear();
return tmp;
}
}
呼叫者
class CreateDownloadTask
{
public async Task VeryLongProcess()
{
await Task.Run(async () =>
{
Console.WriteLine("Long Process Working..");
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Long Process Done..");
});
}
public async Task CreateSimplebroadcastX<T>(T data)
{
Action<T> process = async model =>
{
Console.WriteLine("Working..");
await VeryLongProcess();
Console.WriteLine("Done");
};
var broad = new broadcastBlock<T>(null);
var action = new ActionBlock<T>(process);
var dflo = new DataflowLinkOptions { PropagateCompletion = true };
broad.LinkTo(action,dflo);
await broad.SendAsync(data);
broad.Complete();
await action.Completion.ContinueWith(async tsk =>
{
Console.WriteLine("Continue data");
}).ConfigureAwait(false);
Console.WriteLine("All Done");
}
}
我希望结果应该是
var task = cdt.CreateSimplebroadcastX<SampleModel>(new SampleModel
{
UniqueData = cdt.GetHashCode()
});
task.GetAwaiter().GetResult();
Console.WriteLin("Completed");
但是我所拥有的是
Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed
当ther是ActionBlock中的Working..
Long Process Working..
Continue data
All Done
Completed
Long Process Done..
Done
时,就会发生这种情况。
现在的问题是,没有async-await
,是否可以按我预期的结果?
那意味着ActionBlock.Completion是否要等到ActionBlock内部的WaitHandle
或Action
完成执行?
或者我在用这种模式做错了吗?
在此先感谢您,我的英语不好。
解决方法
您的问题在这里:
Action<T> process = async model => ...
该代码创建一个should be avoided的async void
方法。您应该避免使用async void
的原因之一是因为很难知道该方法何时完成。而这正是正在发生的事情:ActionBlock<T>
无法知道您的代表何时完成,因为它是async void
。
正确的delegate type for an asynchronous method without a return value that takes a single argument是Func<T,Task>
:
Func<T,Task> process = async model => ...
现在异步方法返回了Task
,ActionBlock
可以知道它何时完成。