问题描述
完整的可重现代码是on github,启动可执行文件后内存很快就会飙升。代码主要位于 AsyncBlockingQueue.cs
类中。
public async Task<T> DequeueAsync(
int timeoutInMs = -1,CancellationToken cancellationToken = default)
{
try
{
using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs,cancellationToken))
{
T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
return value;
}
}
catch (ChannelClosedException cce)
{
await Console.Error.WriteLineAsync("Channel is closed.");
throw new ObjectdisposedException("Queue is disposed");
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync("Dequeue Failed.");
throw;
}
}
private CancellationTokenSource GetCancellationTokenSource(
int timeoutInMs,CancellationToken cancellationToken)
{
if (timeoutInMs <= 0)
{
return null;
}
CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
return cts;
}
这样使用时,有内存泄漏:
try
{
string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
// timeout
}
解决方法
更新
来自评论:
有一个处理器可以批量处理消息。当有足够的消息或时间到时它开始处理,这就是超时取消的地方
这意味着真正需要的是一种通过计数和周期来批量处理消息的方法。做任何一个都相对容易。
此方法按计数进行批处理。该方法将消息添加到 batch
列表,直到达到限制,向下游发送数据并清除列表:
static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input,int count,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var batch=new List<Message>(count);
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
if(batch.Count==count)
{
await writer.WriteAsync(batch.ToArray());
batch.Clear();
}
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
按周期分批的方法更复杂,因为定时器可以在收到消息的同时触发。 Interlocked.Exchange
用新的列表替换现有的 batch
列表并将批处理数据发送到下游。 :
static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input,TimeSpan period,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
var batch=new List<Message>();
Timer t=new Timer(async obj =>{
var data=Interlocked.Exchange(ref batch,new List<Message>());
writer.WriteAsync(data.ToArray());
},null,TimeSpan.Zero,period);
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
}
},token)
.ContinueWith(t=>{
timer.Dispose();
writer.TryComplete(t.Exception);
});
return channel;
}
两者兼而有之 - 我仍在努力。问题是计数和计时器到期可以同时发生。最坏的情况,lock(batch)
可用于确保只有线程或循环才能向下游发送数据
原答案
通道在正确使用时不会泄漏 - 就像任何其他容器一样。 Channel 不是异步队列,也绝对不是阻塞队列。这是一个非常不同的结构,具有完全不同的习语。它是一个使用队列的高级容器。有一个很好的理由有单独的 ChannelReader 和 ChannelWriter 类。
典型的场景是让发布者创建并拥有频道。只有发布者可以写入该频道并对其调用 Complete()
。 Channel
未实现 IDisposable
,因此无法处理。发布者仅向订阅者提供 ChannelReader
。
订阅者只能看到 ChannelReader
并从中读取直到它完成。通过使用 ReadAllAsync
,订阅者可以继续从 ChannelReader 读取,直到它完成。
这是一个典型的例子:
ChannelReader<Message> Producer(CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<100;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
//Simulate some work
await Task.Delay(100);
await writer.WriteAsync(new Message(...));
}
},token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
订阅者只需要一个 ChannelReader
即可工作。通过使用 ChannelReader.ReadAllAsync,订阅者只需要 await foreach
来处理消息:
async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
//Use the message
}
}
订阅者可以通过返回 ChannelReader 来生成自己的消息。这就是事情变得非常有趣的地方,因为 Subscriber
方法成为链接步骤管道中的一个步骤。如果我们在 ChannelReader
上将方法转换为扩展方法,我们可以轻松创建整个管道。
让我们生成一些数字:
ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
var channel=Channel.CreateBounded<int>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<nums;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
await writer.WriteAsync(i*7);
await Task.Delay(100);
}
},token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
然后将它们加倍并平方:
ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(2.0*msg);
}
},token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(Math.Sqrt(msg));
}
},token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
最后打印它们
async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
Console.WriteLine(msg);
}
}
现在我们可以建立一个管道
await Generate(100)
.Double()
.Square()
.Print();
并为所有步骤添加取消令牌:
using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
.Double(cts.Token)
.Square(cts.Token)
.Print(cts.Token);
如果一个步骤产生消息的速度比长时间消耗的速度快,则内存使用量可能会增加。这可以通过使用有界通道而不是无界通道轻松处理。这样,如果一个方法太慢,之前的所有方法都必须等待才能发布新数据。
,我能够重现您所观察到的问题。恕我直言,这显然是 Channels 库中的一个缺陷。这是我的重现:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static class Program
{
public static async Task Main()
{
var channel = Channel.CreateUnbounded<int>();
var bufferBlock = new BufferBlock<int>();
var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
var mem0 = GC.GetTotalMemory(true);
int timeouts = 0;
for (int i = 0; i < 10; i++)
{
var stopwatch = Stopwatch.StartNew();
while (stopwatch.ElapsedMilliseconds < 500)
{
using var cts = new CancellationTokenSource(1);
try
{
await channel.Reader.ReadAsync(cts.Token);
//await bufferBlock.ReceiveAsync(cts.Token);
//await asyncCollection.TakeAsync(cts.Token);
}
catch (OperationCanceledException) { timeouts++; }
}
var mem1 = GC.GetTotalMemory(true);
Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
+ $" Allocated: {mem1 - mem0:#,0} bytes");
}
}
}
输出:
1) Timeouts: 124,Allocated: 175,664 bytes
2) Timeouts: 250,Allocated: 269,720 bytes
3) Timeouts: 376,Allocated: 362,544 bytes
4) Timeouts: 502,Allocated: 453,264 bytes
5) Timeouts: 628,Allocated: 548,080 bytes
6) Timeouts: 754,Allocated: 638,800 bytes
7) Timeouts: 880,Allocated: 729,584 bytes
8) Timeouts: 1,006,Allocated: 820,304 bytes
9) Timeouts: 1,132,Allocated: 919,216 bytes
10) Timeouts: 1,258,Allocated: 1,011,928 bytes
每次操作泄漏大约 800 个字节,这非常令人讨厌。每次在通道中写入新值时都会回收内存,因此对于繁忙的通道,这种设计缺陷应该不是问题。但对于偶尔接收值的渠道来说,这可能是一个亮点。
还有其他异步队列实现可用,它们不会遇到同样的问题。您可以尝试注释 await channel.Reader.ReadAsync(cts.Token);
行并取消注释以下两行中的任何一行。您将看到 BufferBlock<T>
库中的 TPL Dataflow 和 AsyncCollection<T>
包中的 Nito.AsyncEx.Coordination 都允许从队列中异步检索超时,而不会出现内存泄漏。
我太专注于实际问题的技术细节,我忘记了这个问题几乎是开箱即用的。
从评论看来,实际问题是:
有一个处理器可以批量处理消息。当有足够的消息或时间到时它开始处理,这就是超时取消的地方
这是由 Buffer 的 ReactiveX.NET 运营商提供的,该运营商由创建 System.Linq.Async 的同一团队构建:
ChannelReader<Message> reader=_channel;
IAsyncEnumerable<IList<Message>> batchItems = reader.ReadAllAsync(token)
.ToObservable()
.Buffer(TimeSpan.FromSeconds(30),5)
.ToAsyncEnumerable();
await foreach(var batch in batchItems.WithCancellation(token))
{
....
}
这些调用可以转换为扩展方法,因此问题的类可以有一个 DequeueAsync
或 BufferAsync
方法,而不是 GetWorkItemsAsync
:
public IAsyncEnumerable<T[]> BufferAsync(
TimeSpan timeSpan,CancellationToken cancellationToken = default)
{
return _channel.Reader.BufferAsync(timeSpan,count,cancellationToken);
}
ToObservable
和 ToAsyncEnumerable
由 System.Linq.Async 提供,并在 IAsyncEnumerable
和 IObservable
(ReactiveX.NET 使用的接口)之间转换。
Buffer 由 System.Reactive 提供并按计数或周期缓冲项目,甚至允许重叠序列。
虽然 LINQ 和 LINQ to Async 提供了对对象的查询操作符,但 Rx.NET 对基于时间的事件流也做同样的事情。可以随时间聚合、按时间缓冲事件、限制它们等。Buffer 的(非官方)文档页面中的示例展示了如何创建重叠序列(例如滑动窗口)。同一页面显示了如何使用 Sample
或 Throttle
通过仅传播一段时间内的最后一个事件来限制快速事件流。
Rx 使用推送模型(新事件被推送给订阅者),而 IAsyncEnumerable 与 IEnumerable 一样,使用拉模型。 ToAsyncEnumerable()
将缓存项目,直到它们被请求,如果没有人在听,这可能会导致问题。
通过这些方法,人们甚至可以创建扩展方法来缓冲或限制发布者:
//Returns all items in a period
public static IAsyncEnumerable<IList<T>> BufferAsync<T>(
this ChannelReader<T> reader,TimeSpan timeSpan,CancellationToken token = default)
{
return reader.ReadAllAsync(token)
.ToObservable()
.Buffer(timeSpan,count)
.ToAsyncEnumerable();
}
//Return the latest item in a period
public static IAsyncEnumerable<T> SampleAsync<T>(
this ChannelReader<T> reader,CancellationToken token = default)
{
return reader.ReadAllAsync(token)
.ToObservable()
.Sample(timeSpan)
.ToAsyncEnumerable();
}