问题描述
我正在使用Channel
中的system.threading.channels
,并且想批量读取项目(5个项目),并且我有如下方法,
public class Batcher
{
private readonly Channel<Measurementviewmodel> _channel;
public Batcher()
{
_channel = Channel.CreateUnbounded<Measurementviewmodel>();
}
public async Task<Measurementviewmodel[]> ReadBatchAsync(int batchSize,CancellationToken stoppingToken)
{
var result = new Measurementviewmodel[batchSize];
for (var i = 0; i < batchSize; i++)
{
result[i] = await _channel.Reader.ReadAsync(stoppingToken);
}
return result;
}
}
在asp.net核心后台服务中,我像下面这样使用它,
public class WriterService : BackgroundService
{
private readonly Batcher _batcher;
public WriterService(Batcher batcher)
{
_batcher = batcher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var batchOfItems = await _batcher.ReadBatchAsync(5,stoppingToken);
var range = string.Join(',',batchOfItems.Select(item => item.Value));
var x = range;
}
}
}
这是可行的,只要Channel
中有5个项目,我就会得到range
。
问题是,当Channel
中仅剩2个项目,并且自最近10分钟以来没有项目进入Channel
时,如何读取Channel
中其余2个项目? / p>
解决方法
您可以创建一个linked CancellationTokenSource
,以便同时监视外部取消请求和内部引起的超时。下面是通过为ChannelReader
类创建ReadBatchAsync
扩展方法来使用此技术的示例:
public static async ValueTask<T[]> ReadBatchAsync<T>(
this ChannelReader<T> channelReader,int batchSize,TimeSpan timeout,CancellationToken cancellationToken = default)
{
// Arguments validation omitted
var items = new List<T>(batchSize);
using (var linkedCTS
= CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
linkedCTS.CancelAfter(timeout);
while (true)
{
var token = items.Count == 0 ? cancellationToken : linkedCTS.Token;
T item;
try
{
item = await channelReader.ReadAsync(token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
break; // The cancellation was induced by timeout (ignore it)
}
catch (ChannelClosedException)
{
if (items.Count == 0) throw;
break;
}
items.Add(item);
if (items.Count >= batchSize) break;
}
}
return items.ToArray();
}
此方法将在经过指定的timeout
之后立即生成一个批处理,或者如果达到了batchSize
,则将更快地生成一个批处理,前提是该批处理至少包含一项。否则,它将在收到第一件物品后立即产生单件物品。
如果通过调用channel.Writer.Complete()
方法完成了频道,并且其中不包含更多项目,则ReadBatchAsync
方法将传播与本地{{ 1}}方法。
如果外部ReadAsync
被取消,则通过抛出CancellationToken
来传播取消。这是大多数可取消API的ChannelClosedException
。
用法示例:
OperationCanceledException