问题描述
背景
在我的 .NET 5 控制台应用程序中,我的使用者作为线程(任务)运行。目前只有一个,但未来可能会在同一个 BufferBlock 实例上同时存在赛车消费者。
当前的实现运行良好。 我坚持实现以下附加功能:
我想在配置的时间内醒来,即使缓冲区为空,仍然留在主循环中。 此用例必须与令牌的 IsCancellationRequested 唤醒区分开来, 但可选将其与“产品可供消费”区分开来不是强制性的。
我确实看到 ReceiveAsync
的过载超时,但不清楚如何涉及 OutputAvailableAsync
,它不接受超时。
问题
如何实现在给定时间段内唤醒,并保持在循环中。仅在 IsCancellationRequested 应该打破循环的情况下
public class MyConsumer
{
private readonly BufferBlock<MyProduct> _products;
public void Start(CancellationToken token)
{
Task.Factory.StartNew(() => Run(token),token);
}
private async Task Run(CancellationToken token)
{
await ConsumeAsync(token);
}
private async Task ConsumeAsync(CancellationToken token)
{
while (await _products.OutputAvailableAsync(token))
{
var product = await _products.ReceiveAsync(token);
// Consume product goes here:...
// I would like wake up here in a configured period,even the buffer is empty.
//
// How to implement this timeout based wake up? (then still remain in the loop)
// I do not even understand clearly why are we using the two waiting operations,// the 1) OutputAvailableAsync(token) then 2) ReceiveAsync(token)
}
}
}
解决方法
我对 BufferBlock
不是很熟悉,但在一般意义上:如果 async
API 不提供超时,您可以通过带有超时的取消令牌 - 并且仍然尊重现有令牌:
using var cts = new CancellationTokenSource();
// link the existing CancellationToken so that *it* can propagate cancellation
using var linked = token.Register(
static s => ((CancellationTokenSource)s).Cancel(),cts,false);
// add a timeout
cts.CancelAfter(yourTimeoutHere);
// use this new combined token to do the magic
await DoSomethingAsync(cts.Token);
(如果您使用的不是 C# 9 或更高版本,请删除 static
;这只是验证我们不会因回调中捕获的变量而导致额外分配)
一旦你有了它,你就可以简单地响应你的超时作为增量工作,抓住OperationCanceledException
,然后做你需要的。有一点需要注意的是,如果取消发生,要知道 DoSomethingAsync
中会保留什么状态。这将是特定于场景的。