TPL数据流-终结块

问题描述

我有一个用于Web抓取的工作管道。它正在下载基于URL的页面,解析内容,然后将结果拆分为块(每个x元素都保存到DB中),并且运行正常。

但是我还需要一个额外的步骤,该步骤将总结管道中已完成的所有操作。我当前的实现是通过管道传递相同的对象(仅在后续步骤中添加一些值),所以我做了一些repro代码,该代码应显示我想要实现的目标。

Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var resultsToFinalize = new List<string>();
var downloadUrl = new TransformBlock<string,string>(url =>
{
    Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
    return url;
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var parseContent = new TransformBlock<string,string>(content =>
{
    Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
    return $"parsing result for: {content}";
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var saveToDb = new ActionBlock<string[]>(results =>
{
    Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(",",results)}");
    results.ToList().ForEach(t => resultsToFinalize.Add(t));
},new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

downloadUrl.LinkTo(parseContent,new DataflowLinkOptions
{
    PropagateCompletion = true
});
parseContent.LinkTo(workBuffer,new DataflowLinkOptions
{
    PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb,new DataflowLinkOptions
{
    PropagateCompletion = true
});

Enumerable.Range(2,9).Select(t => downloadUrl.Post($"http://some_site_to_parse.com{t}"));

downloadUrl.Complete();
saveToDb.Completion.Wait();
Console.WriteLine(String.Join(Environment.NewLine,resultsToFinalize));

当前,它正在运行,因为有外部resultsToFinalize变量来收集所有结果。

可能我应该将saveToDb更改为TransformBlock。但是如何累积结果直到整个管道准备就绪(最好是在所有(上一个块)完成后触发这个额外的块)

从理论上讲,我可以在此额外步骤之前再创建一个BatchBlock并将其大小设置为“输入大小”,但这似乎有点不客气:)

否则您将如何解决这个问题?

[UPDATE 12.08.2020 16:47] 似乎有些人难以理解-我想要实现的目标。因此,我将明确说明:我已经发布了一些代码。我希望有相同的输出,我的代码正在生成:

处理开始:2020年8月12日16:27:46

结果:1​​2.08.2020 16:27:52解析结果为:http://some_site_to_parse.com2,解析结果为:http://some_site_to_parse.com3,解析结果为:http://some_site_to_parse.com4 >

结果:1​​2.08.2020 16:27:57解析结果为:http://some_site_to_parse.com5,解析结果为:http://some_site_to_parse.com6,解析结果为:http://some_site_to_parse.com7 >

结果:1​​2.08.2020 16:28:00解析结果为:http://some_site_to_parse.com8,解析结果为:http://some_site_to_parse.com9,解析结果为:http://some_site_to_parse.com10 >

解析结果:http://some_site_to_parse.com2

解析结果:http://some_site_to_parse.com3

解析结果:http://some_site_to_parse.com4

解析结果:http://some_site_to_parse.com5

解析结果:http://some_site_to_parse.com6

解析结果:http://some_site_to_parse.com7

解析结果:http://some_site_to_parse.com8

解析结果:http://some_site_to_parse.com9

解析结果:http://some_site_to_parse.com10

但不使用resultsToFinalize(使用TPL的功能:))

我认为可能将saveToDb从ActionBlock更改为TransformBlock。最后可能应该有一些新的ActionBlock。问题是如何设置它-以便仅触发一次。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...