问题描述
目的
我正在尝试创建一个管道,每次从文件中读取一个字节记录,然后将其发送到“ BufferBlock”,后者会在缓冲区块中追加项目。通过简单的LinkTo()方法将其链接到TransformBlock
BufferBlock<byte[]> buffer = new BufferBlock<byte[]>();
TransformBlock<byte[],MyObject> transform = new TransformBlock<byte[],MyObject>(bytes =>
{
return FromBytesTOMyObject(bytes);
});
private void ReadFileAndAppend()
{
buffer.LinkTo(transform,new DataflowLinkOptions() { PropagateCompletion = true });
BinaryReader br = new BinaryReader(new FileStream("C:\\Users\\MyUser\\myFile.raw",FileMode.Open,FileAccess.Read,FileShare.Read));
int count;
byte[] record = new byte[4000];
// Post more messages to the block.
Task.Run(async () =>
{
while ((count = br.Read(record,record.Length)) != 0)
await buffer.SendAsync(record);
buffer.Complete();
});
transform.Completion.Wait();
Console.WriteLine("");
static public MyObject FromBytesToMyObject(byte[] record)
{
MyObject object = new MyObject();
object.counter = BitConverter.ToInt32(record,0);
object.nPoints = BitConverter.ToInt32(record,4);
for (int i = 0; i < object.nPoints; i++)
{
int index = i * 4;
object.A[i] = BitConverter.ToSingle(record,index + 8);
}
return object;
}
从FromBytesToMyObject()方法可以看出,每个读取的记录内都有一个计数器。 因此,每条记录永远都不会拥有与另一条记录相等的计数器(我也通过HxD这样的字节读取器进行了检查)
问题
使用此设置,我认为文件的读取和解释可以顺利进行。但是进入调试并在读取约50条或更多记录后在“ while”中插入断点,我注意到在TransformBlock的OutputQueue中 具有相同计数器的记录组排队,因此记录相同。
示例:
仅考虑计数器的确切队列: 1,2,3,4,5,6,7,8,9,10。
我在OutputQueue中实际看到的队列:1,5 ....
你能告诉我我错了吗?
解决方法
您将一遍又一遍地重复使用同一byte[] record
,而没有任何线程安全方面的考虑。难怪事情进展很快。如果要确保整个操作的正确性,则每次必须使用不同的缓冲区:
while (true)
{
var record = new byte[4000];
var count = binaryReader.Read(record,record.Length);
if (count == 0) break;
Array.Resize(ref record,count);
await buffer.SendAsync(record);
}
buffer.Complete();
如果您还关心性能并且不想让垃圾收集器负担过多,则应查看ArrayPool
类。但是要小心,因为这堂课提供了新的射击方法。