问题描述
我有一个用于Web抓取的工作管道。它正在下载基于URL的页面,解析内容,然后将结果拆分为块(每个x元素都保存到DB中),并且运行正常。
但是我还需要一个额外的步骤,该步骤将总结管道中已完成的所有操作。我当前的实现是通过管道传递相同的对象(仅在后续步骤中添加一些值),所以我做了一些repro代码,该代码应显示我想要实现的目标。
Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var resultsToFinalize = new List<string>();
var downloadUrl = new TransformBlock<string,string>(url =>
{
Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
return url;
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var parseContent = new TransformBlock<string,string>(content =>
{
Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
return $"parsing result for: {content}";
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var saveToDb = new ActionBlock<string[]>(results =>
{
Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(",",results)}");
results.ToList().ForEach(t => resultsToFinalize.Add(t));
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
downloadUrl.LinkTo(parseContent,new DataflowLinkOptions
{
PropagateCompletion = true
});
parseContent.LinkTo(workBuffer,new DataflowLinkOptions
{
PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb,new DataflowLinkOptions
{
PropagateCompletion = true
});
Enumerable.Range(2,9).Select(t => downloadUrl.Post($"http://some_site_to_parse.com{t}"));
downloadUrl.Complete();
saveToDb.Completion.Wait();
Console.WriteLine(String.Join(Environment.NewLine,resultsToFinalize));
当前,它正在运行,因为有外部resultsToFinalize
变量来收集所有结果。
可能我应该将saveToDb更改为TransformBlock。但是如何累积结果直到整个管道准备就绪(最好是在所有(上一个块)完成后触发这个额外的块)
从理论上讲,我可以在此额外步骤之前再创建一个BatchBlock并将其大小设置为“输入大小”,但这似乎有点不客气:)
否则您将如何解决这个问题?
[UPDATE 12.08.2020 16:47] 似乎有些人难以理解-我想要实现的目标。因此,我将明确说明:我已经发布了一些代码。我希望有相同的输出,我的代码正在生成:
处理开始:2020年8月12日16:27:46
结果:12.08.2020 16:27:52解析结果为:http://some_site_to_parse.com2,解析结果为:http://some_site_to_parse.com3,解析结果为:http://some_site_to_parse.com4 >
结果:12.08.2020 16:27:57解析结果为:http://some_site_to_parse.com5,解析结果为:http://some_site_to_parse.com6,解析结果为:http://some_site_to_parse.com7 >
结果:12.08.2020 16:28:00解析结果为:http://some_site_to_parse.com8,解析结果为:http://some_site_to_parse.com9,解析结果为:http://some_site_to_parse.com10 >
解析结果:http://some_site_to_parse.com2
解析结果:http://some_site_to_parse.com3
解析结果:http://some_site_to_parse.com4
解析结果:http://some_site_to_parse.com5
解析结果:http://some_site_to_parse.com6
解析结果:http://some_site_to_parse.com7
解析结果:http://some_site_to_parse.com8
解析结果:http://some_site_to_parse.com9
解析结果:http://some_site_to_parse.com10
但不使用resultsToFinalize(使用TPL的功能:))
我认为可能将saveToDb从ActionBlock更改为TransformBlock。最后可能应该有一些新的ActionBlock。问题是如何设置它-以便仅触发一次。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)