TPL 数据流:避免在每次调用其委托时重复运行 using 块例如写入 StreamWriter的 ActionBlock

问题描述

我需要从 IDataReader 读取 1M 行,并同时写入 n 个文本文件。这些文件中的每一个都将是可用列的不同子集;所有 n 文本文件在完成时都是 100 万行。

当前计划是一个 TransformManyBlock 来迭代 IDataReader,链接一个 broadcastBlock,链接n 个 BufferBlock/ActionBlock 对。

我试图避免的是让我的 ActionBlock 委托执行一个 using (StreamWriter x...) { x.WriteLine(); },它会打开和关闭每个输出文件一百万次。

我目前的想法是代替 ActionBlock,编写一个实现 ITargetBlock 的自定义类。有没有更简单的方法

编辑 1:讨论对我当前的问题很有价值,但到目前为止的答案都集中在文件系统行为上。为了未来搜索者的利益,问题的重点是如何在 ActionBlock 委托之外构建某种设置/拆卸。这适用于您通常包装在 using 块中的任何类型的一次性物品。

编辑 2:根据@Panagiotis Kanavos,解决方案的执行摘要在定义块之前设置对象,然后在块的 Completion.ContinueWith 中拆除对象

解决方法

通常在使用 TPL 时,我会创建自定义类,以便我可以创建用于管道中块的私有成员变量和私有方法,但不是实现 ITargetBlockISourceBlock,我'将在我的自定义类中包含我需要的任何块,然后将 ITargetBlock 和/或 ISourceBlock 作为公共属性公开,以便其他类可以使用源块和目标块将事物链接在一起。

,

即使您不必每次都打开流,一次一行写入文件本身就很昂贵。保持文件流打开也有其他问题,因为出于性能原因,文件流总是被缓冲,从 FileStream 级别一直到文件系统驱动程序。您必须定期刷新流以确保将数据写入磁盘。

要真正提高性能,您必须对记录进行批处理,例如使用 BatchBlock。一旦你这样做了,打开流的成本就可以忽略不计了。

这些行也应该在最后一刻生成,以避免生成需要被垃圾收集的临时字符串。在 n*1M 记录时,这些分配和垃圾收集的内存和 CPU 开销会很严重。

在写入之前记录库批量日志条目以避免这种性能下降。

你可以试试这样的:

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {
   
    //Create or open a file for appending
    using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    {
        writer.WriteLine("{0} = {1} :{2}",record.Prop1,record.Prop5,record.Prop2);
    }

});

batchBlock.LinkTo(writerBlock,options);

或者,使用异步方法

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {
   
    //Create or open a file for appending
    await using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    {
        await writer.WriteLineAsync("{0} = {1} :{2}",options);

您可以调整批处理大小和 StreamWriter 的缓冲区大小以获得最佳性能。

创建写入流的实际“块”

可以使用 Custom Dataflow block walkthrough 中显示的技术创建自定义块 - 不是创建实际的自定义块,而是创建返回 LinkTo 工作所需的任何内容的东西,在这种情况下是 { {1}}:

ITargetBlock< T>

这里的“技巧”是在块外创建流并保持活动状态直到块完成。它不会被垃圾收集,因为它被其他代码使用。当块完成时,无论发生什么,我们都需要显式关闭它。 ITargetBlock<Record> FileExporter(string path) { var writer=new StreamWriter(path,true); var block=new ActionBlock<Record>(async msg=>{ await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop2); }); //Close the stream when the block completes block.Completion.ContinueWith(_=>write.Close()); return (ITargetBlock<Record>)target; } ... var exporter1=CreateFileExporter(path1); previous.LinkTo(exporter,options); 将关闭流,无论块是否正常完成。

这与演练中使用的代码相同,用于关闭输出 BufferBlock :

block.Completion.ContinueWith(_=>write.Close());

默认情况下流是缓冲的,因此调用 target.Completion.ContinueWith(delegate { if (queue.Count > 0 && queue.Count < windowSize) source.Post(queue.ToArray()); source.Complete(); }); 并不意味着数据将实际写入磁盘。这意味着我们不知道数据何时会真正写入文件。如果应用程序崩溃,可能会丢失一些数据。

内存、IO 和开销

在很长一段时间内处理 100 万行时,事情会加起来。可以使用例如 WriteLine 一次写入一批行,但这会导致分配 1M 临时字符串。在每次迭代中,运行时必须至少使用这些临时字符串作为批处理的 RAM。在 GC 启动冻结线程之前,RAM 使用量会开始膨胀到数百 MB,然后是 GB。

对于 100 万行和大量数据,很难调试和跟踪管道中的数据。如果出现问题,事情会非常崩溃。想象一下,例如 100 万条消息卡在 one 块中,因为一条消息被阻止了。

重要的是(出于理智和性能原因)使管道中的各个组件尽可能简单。