Producer/Consumer with BufferBlock,即使缓冲区中没有产品,如何定期唤醒?

问题描述

背景

在我的 .NET 5 控制台应用程序中,我的使用者作为线程(任务)运行。目前只有一个,但未来可能会在同一个 BufferBlock 实例上同时存在赛车消费者。

当前的实现运行良好。 我坚持实现以下附加功能

我想在配置的时间内醒来,即使缓冲区为空,仍然留在主循环中。 此用例必须与令牌的 IsCancellationRequested 唤醒区分开来, 但可选将其与“产品可供消费”区分开来不是强制性的。

我确实看到 ReceiveAsync 的过载超时,但不清楚如何涉及 OutputAvailableAsync,它不接受超时。

问题

如何实现在给定时间段内唤醒,并保持在循环中。仅在 IsCancellationRequested 应该打破循环的情况下

public class MyConsumer
{
    private readonly BufferBlock<MyProduct> _products;

    public void Start(CancellationToken token)
    {
        Task.Factory.StartNew(() => Run(token),token);
    }

    private async Task Run(CancellationToken token)
    {
        await ConsumeAsync(token);
    }

    private async Task ConsumeAsync(CancellationToken token)
    {
        while (await _products.OutputAvailableAsync(token))
        {
            var product = await _products.ReceiveAsync(token);
            // Consume product goes here:...

            // I would like wake up here in a configured period,even the buffer is empty.
            //
            // How to implement this timeout based wake up? (then still remain in the loop)
            // I do not even understand clearly why are we using the two waiting operations,// the 1) OutputAvailableAsync(token) then 2) ReceiveAsync(token)
        }
    }
}

解决方法

我对 BufferBlock 不是很熟悉,但在一般意义上:如果 async API 不提供超时,您可以通过带有超时的取消令牌 - 并且仍然尊重现有令牌:

using var cts = new CancellationTokenSource();
// link the existing CancellationToken so that *it* can propagate cancellation
using var linked = token.Register(
    static s => ((CancellationTokenSource)s).Cancel(),cts,false);
// add a timeout
cts.CancelAfter(yourTimeoutHere);
// use this new combined token to do the magic
await DoSomethingAsync(cts.Token);

(如果您使用的不是 C# 9 或更高版本,请删除 static;这只是验证我们不会因回调中捕获的变量而导致额外分配)


一旦你有了它,你就可以简单地响应你的超时作为增量工作,抓住OperationCanceledException,然后做你需要的。有一点需要注意的是,如果取消发生,要知道 DoSomethingAsync 中会保留什么状态。这将是特定于场景的。