c# – 如何聚合来自异步生成器的数据并将其写入文件?

我正在学习C#中的异步/等待模式.目前我正在尝试解决这样的问题:

>有一个生产者(硬件设备)每秒生成1000个数据包.我需要将此数据记录到文件中.
>设备只有一个ReadAsync()方法一次报告一个数据包.
>我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次.
>如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败.

到目前为止,我写了类似下面的内容.它有效,但我不确定这是否是解决问题的最佳方法.有任何意见或建议吗?在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么?

static async Task TestLogger(Device device,int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}

解决方法

您可以使用以下想法,前提是flush的条件是数据包的数量(最多1000个).我没有测试它.它利用了 this question中Stephen Cleary的 AsyncProducerConsumerQueue<T>特色.
AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(),token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task prevIoUsFlush = Task.Fromresult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!prevIoUsFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the prevIoUs flush if not ready
           throw new Exception("Failed to flush on time.");
       }
       await prevIoUsFlush; // it's completed,observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       prevIoUsFlush = _stream.WriteAsync(nextBatch,nextBatch.Count,cts.Token);
    }
}

如果您不想让记录器失败,而是希望取消刷新并继续下一批,则可以对此代码进行最小的更改.

回应@ l3arnon评论

  1. A packet is not a byte,it’s byte[]. 2. You haven’t used the OP’s ToHexString. 3. AsyncProducerConsumerQueue is much less robust and
    tested than .Net’s TPL Dataflow. 4. You await prevIoUsFlush for errors
    just after you throw an exception which makes that line redundant.
    etc. In short: I think the possible added value doesn’t justify this
    very complicated solution.

>“数据包不是字节,它是字节[]” – 数据包是一个字节,这从OP的代码中很明显:buffer [i] = await device.ReadAsync().然后,一批数据包是byte [].
>“你还没有使用OP的ToHexString.” – 目标是展示如何使用Stream.WriteAsync本身接受取消令牌,而不是WriteLineAsync,它不允许取消.将ToHexString与Stream.WriteAsync一起使用并仍然利用取消支持是微不足道的:

var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
    Environment.NewLine);
_stream.WriteAsync(hexBytes,hexBytes.Length,token);

>“AsyncProducerConsumerQueue的稳健性和测试性都不如.Net的TPL数据流” – 我不认为这是一个确定的事实.但是,如果OP关心它,他可以使用常规的BlockingCollection,它不会阻塞生产者线程.在等待下一批时阻止使用者线程是可以的,因为写入是并行完成的.与此相反,您的TPL Dataflow版本带有一个冗余cpu和锁密集操作:使用logAction.Post(数据包)将数据从生产者管道移动到写入器pipleline,逐字节.我的代码不这样做.>“在抛出导致该行冗余的异常之后,您等待prevIoUsFlush出错.” – 这条线不是多余的.也许,你错过了这一点:当prevIoUsFlush.IsFaulted或prevIoUsFlush.IsCancelled也为true时,prevIoUsFlush.IsCompleted可能为true.因此,等待prevIoUsFlush与之相关,以观察已完成任务上的任何错误(例如,写入失败),否则将丢失.

相关文章

在要实现单例模式的类当中添加如下代码:实例化的时候:frmC...
1、如果制作圆角窗体,窗体先继承DOTNETBAR的:public parti...
根据网上资料,自己很粗略的实现了一个winform搜索提示,但是...
近期在做DSOFramer这个控件,打算自己弄一个自定义控件来封装...
今天玩了一把WMI,查询了一下电脑的硬件信息,感觉很多代码都...
最近在研究WinWordControl这个控件,因为上级要求在系统里,...