每当读取记录时,都会覆盖TransformBlock <TInput,TOutput>中InputQueue的每个元素 目的问题

问题描述

目的

我正在尝试创建一个管道,每次从文件中读取一个字节记录,然后将其发送到“ BufferBlock”,后者会在缓冲区块中追加项目。通过简单的LinkTo()方法将其链接到TransformBlock ,该转换将每个字节记录转换为MyObject对象。 下面是完成所有这些操作的整个方法

    BufferBlock<byte[]> buffer = new BufferBlock<byte[]>();
    TransformBlock<byte[],MyObject> transform = new TransformBlock<byte[],MyObject>(bytes =>
    {
        return FromBytesTOMyObject(bytes);
    });

    private void ReadFileAndAppend()
    {
        buffer.LinkTo(transform,new DataflowLinkOptions() { PropagateCompletion = true });

        BinaryReader br = new BinaryReader(new FileStream("C:\\Users\\MyUser\\myFile.raw",FileMode.Open,FileAccess.Read,FileShare.Read));                                  
        int count;
        byte[] record = new byte[4000];

        // Post more messages to the block.
        Task.Run(async () =>
        {
            while ((count = br.Read(record,record.Length)) != 0)
                await buffer.SendAsync(record);
            buffer.Complete();
        });
        transform.Completion.Wait();
        Console.WriteLine("");

在TransformBlock内部调用方法下面:

static public MyObject FromBytesToMyObject(byte[] record)
    {
        MyObject object = new MyObject();
        object.counter = BitConverter.ToInt32(record,0);
        object.nPoints = BitConverter.ToInt32(record,4);

        for (int i = 0; i < object.nPoints; i++)
        {
            int index = i * 4;
            object.A[i] = BitConverter.ToSingle(record,index + 8);
        }
        return object;
    }
从FromBytesToMyObject()方法可以看出,

每个读取的记录内都有一个计数器。 因此,每条记录永远都不会拥有与另一条记录相等的计数器(我也通过HxD这样的字节读取器进行了检查)

问题

使用此设置,我认为文件的读取和解释可以顺利进行。但是进入调试并在读取约50条或更多记录后在“ while”中插入断点,我注意到在TransformBlock的OutputQueue中 具有相同计数器的记录组排队,因此记录相同。

示例:

仅考虑计数器的确切队列: 1,2,3,4,5,6,7,8,9,10。

我在OutputQueue中实际看到的队列:1,5 ....

你能告诉我我错了吗?

解决方法

您将一遍又一遍地重复使用同一byte[] record,而没有任何线程安全方面的考虑。难怪事情进展很快。如果要确保整个操作的正确性,则每次必须使用不同的缓冲区:

while (true)
{
    var record = new byte[4000];
    var count = binaryReader.Read(record,record.Length);
    if (count == 0) break;
    Array.Resize(ref record,count);
    await buffer.SendAsync(record);
}
buffer.Complete();

如果您还关心性能并且不想让垃圾收集器负担过多,则应查看ArrayPool类。但是要小心,因为这堂课提供了新的射击方法。