问题描述
我需要从 IDataReader 读取 1M 行,并同时写入 n 个文本文件。这些文件中的每一个都将是可用列的不同子集;所有 n 文本文件在完成时都是 100 万行。
当前计划是一个 TransformManyBlock 来迭代 IDataReader,链接到一个 broadcastBlock,链接到 n 个 BufferBlock/ActionBlock 对。
我试图避免的是让我的 ActionBlock 委托执行一个 using (StreamWriter x...) { x.WriteLine(); }
,它会打开和关闭每个输出文件一百万次。
我目前的想法是代替 ActionBlock,编写一个实现 ITargetBlock 的自定义类。有没有更简单的方法?
编辑 1:讨论对我当前的问题很有价值,但到目前为止的答案都集中在文件系统行为上。为了未来搜索者的利益,问题的重点是如何在 ActionBlock 委托之外构建某种设置/拆卸。这适用于您通常包装在 using 块中的任何类型的一次性物品。
编辑 2:根据@Panagiotis Kanavos,解决方案的执行摘要是在定义块之前设置对象,然后在块的 Completion.ContinueWith 中拆除对象。
解决方法
通常在使用 TPL 时,我会创建自定义类,以便我可以创建用于管道中块的私有成员变量和私有方法,但不是实现 ITargetBlock
或 ISourceBlock
,我'将在我的自定义类中包含我需要的任何块,然后将 ITargetBlock
和/或 ISourceBlock
作为公共属性公开,以便其他类可以使用源块和目标块将事物链接在一起。
即使您不必每次都打开流,一次一行写入文件本身就很昂贵。保持文件流打开也有其他问题,因为出于性能原因,文件流总是被缓冲,从 FileStream
级别一直到文件系统驱动程序。您必须定期刷新流以确保将数据写入磁盘。
要真正提高性能,您必须对记录进行批处理,例如使用 BatchBlock。一旦你这样做了,打开流的成本就可以忽略不计了。
这些行也应该在最后一刻生成,以避免生成需要被垃圾收集的临时字符串。在 n*1M 记录时,这些分配和垃圾收集的内存和 CPU 开销会很严重。
在写入之前记录库批量日志条目以避免这种性能下降。
你可以试试这样的:
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {
//Create or open a file for appending
using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
writer.WriteLine("{0} = {1} :{2}",record.Prop1,record.Prop5,record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);
或者,使用异步方法
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {
//Create or open a file for appending
await using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
await writer.WriteLineAsync("{0} = {1} :{2}",options);
您可以调整批处理大小和 StreamWriter 的缓冲区大小以获得最佳性能。
创建写入流的实际“块”
可以使用 Custom Dataflow block walkthrough 中显示的技术创建自定义块 - 不是创建实际的自定义块,而是创建返回 LinkTo
工作所需的任何内容的东西,在这种情况下是 { {1}}:
ITargetBlock< T>
这里的“技巧”是在块外创建流并保持活动状态直到块完成。它不会被垃圾收集,因为它被其他代码使用。当块完成时,无论发生什么,我们都需要显式关闭它。 ITargetBlock<Record> FileExporter(string path)
{
var writer=new StreamWriter(path,true);
var block=new ActionBlock<Record>(async msg=>{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop2);
});
//Close the stream when the block completes
block.Completion.ContinueWith(_=>write.Close());
return (ITargetBlock<Record>)target;
}
...
var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);
将关闭流,无论块是否正常完成。
这与演练中使用的代码相同,用于关闭输出 BufferBlock :
block.Completion.ContinueWith(_=>write.Close());
默认情况下流是缓冲的,因此调用 target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
并不意味着数据将实际写入磁盘。这意味着我们不知道数据何时会真正写入文件。如果应用程序崩溃,可能会丢失一些数据。
内存、IO 和开销
在很长一段时间内处理 100 万行时,事情会加起来。可以使用例如 WriteLine
一次写入一批行,但这会导致分配 1M 临时字符串。在每次迭代中,运行时必须至少使用这些临时字符串作为批处理的 RAM。在 GC 启动冻结线程之前,RAM 使用量会开始膨胀到数百 MB,然后是 GB。
对于 100 万行和大量数据,很难调试和跟踪管道中的数据。如果出现问题,事情会非常崩溃。想象一下,例如 100 万条消息卡在 one 块中,因为一条消息被阻止了。
重要的是(出于理智和性能原因)使管道中的各个组件尽可能简单。