带有 CancellationTokenSource 的通道在处理后超时内存泄漏

问题描述

完整的可重现代码on github,启动可执行文件后内存很快就会飙升。代码主要位于 AsyncBlockingQueue.cs 类中。

以下代码实现了一个简单的异步“阻塞”队列:

        public async Task<T> DequeueAsync(
            int timeoutInMs = -1,CancellationToken cancellationToken = default)
        {
            try
            {
                using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs,cancellationToken))
                {
                    T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
                    return value;
                }
            }
            catch (ChannelClosedException cce)
            {
                await Console.Error.WriteLineAsync("Channel is closed.");
                throw new ObjectdisposedException("Queue is disposed");
            }
            catch (OperationCanceledException)
            {
                throw;
            }
            catch (Exception ex)
            {
                await Console.Error.WriteLineAsync("Dequeue Failed.");
                throw;
            }
        }


        private CancellationTokenSource GetCancellationTokenSource(
            int timeoutInMs,CancellationToken cancellationToken)
        {
            if (timeoutInMs <= 0)
            {
                return null;
            }

            CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
            return cts;
        }

这样使用时,有内存泄漏:

try
{
   string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
   // timeout 
}

enter image description here

解决方法

更新

来自评论:

有一个处理器可以批量处理消息。当有足够的消息或时间到时它开始处理,这就是超时取消的地方

这意味着真正需要的是一种通过计数和周期来批量处理消息的方法。做任何一个都相对容易。

此方法按计数进行批处理。该方法将消息添加到 batch 列表,直到达到限制,向下游发送数据并清除列表:

static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input,int count,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    _ = Task.Run(async ()=>{
        var batch=new List<Message>(count);
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
            if(batch.Count==count)
            {
                await writer.WriteAsync(batch.ToArray());
                batch.Clear();
            }
        }
    },token)
   .ContinueWith(t=>writer.TryComplete(t.Exception));
   return channel;
}

按周期分批的方法更复杂,因为定时器可以在收到消息的同时触发。 Interlocked.Exchange 用新的列表替换现有的 batch 列表并将批处理数据发送到下游。 :

static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input,TimeSpan period,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    var batch=new List<Message>();
    Timer t=new Timer(async obj =>{
        var data=Interlocked.Exchange(ref batch,new List<Message>());
        writer.WriteAsync(data.ToArray());
    },null,TimeSpan.Zero,period);

    _ = Task.Run(async ()=>{
        
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
        }
    },token)
   .ContinueWith(t=>{
        timer.Dispose();
        writer.TryComplete(t.Exception);
   });
   return channel;
}

两者兼而有之 - 我仍在努力。问题是计数和计时器到期可以同时发生。最坏的情况,lock(batch) 可用于确保只有线程或循环才能向下游发送数据

原答案

通道在正确使用时不会泄漏 - 就像任何其他容器一样。 Channel 不是异步队列,也绝对不是阻塞队列。这是一个非常不同的结构,具有完全不同的习语。它是一个使用队列的高级容器。有一个很好的理由有单独的 ChannelReader 和 ChannelWriter 类。

典型的场景是让发布者创建并拥有频道。只有发布者可以写入该频道并对其调用 Complete()Channel 未实现 IDisposable,因此无法处理。发布者仅向订阅者提供 ChannelReader

订阅者只能看到 ChannelReader 并从中读取直到它完成。通过使用 ReadAllAsync,订阅者可以继续从 ChannelReader 读取,直到它完成。

这是一个典型的例子:

ChannelReader<Message> Producer(CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Message>();
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<100;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }
            //Simulate some work
            await Task.Delay(100);
            await writer.WriteAsync(new Message(...));          
        }
    },token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

订阅者只需要一个 ChannelReader 即可工作。通过使用 ChannelReader.ReadAllAsync,订阅者只需要 await foreach 来处理消息:

async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        //Use the message
    }
}

订阅者可以通过返回 ChannelReader 来生成自己的消息。这就是事情变得非常有趣的地方,因为 Subscriber 方法成为链接步骤管道中的一个步骤。如果我们在 ChannelReader 上将方法转换为扩展方法,我们可以轻松创建整个管道。

让我们生成一些数字:

ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<nums;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }

            await writer.WriteAsync(i*7);  
            await Task.Delay(100);        
        }
    },token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

然后将它们加倍并平方:

ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(2.0*msg);          
        }
    },token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(Math.Sqrt(msg));          
        }
    },token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

最后打印它们

async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        Console.WriteLine(msg);
    }
}

现在我们可以建立一个管道


await Generate(100)
          .Double()
          .Square()
          .Print();

并为所有步骤添加取消令牌:

using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
          .Double(cts.Token)
          .Square(cts.Token)
          .Print(cts.Token);

如果一个步骤产生消息的速度比长时间消耗的速度快,则内存使用量可能会增加。这可以通过使用有界通道而不是无界通道轻松处理。这样,如果一个方法太慢,之前的所有方法都必须等待才能发布新数据。

,

我能够重现您所观察到的问题。恕我直言,这显然是 Channels 库中的一个缺陷。这是我的重现:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public static class Program
{
    public static async Task Main()
    {
        var channel = Channel.CreateUnbounded<int>();
        var bufferBlock = new BufferBlock<int>();
        var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
        var mem0 = GC.GetTotalMemory(true);
        int timeouts = 0;
        for (int i = 0; i < 10; i++)
        {
            var stopwatch = Stopwatch.StartNew();
            while (stopwatch.ElapsedMilliseconds < 500)
            {
                using var cts = new CancellationTokenSource(1);
                try
                {
                    await channel.Reader.ReadAsync(cts.Token);
                    //await bufferBlock.ReceiveAsync(cts.Token);
                    //await asyncCollection.TakeAsync(cts.Token);
                }
                catch (OperationCanceledException) { timeouts++; }
            }
            var mem1 = GC.GetTotalMemory(true);
            Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
                + $" Allocated: {mem1 - mem0:#,0} bytes");
        }
    }
}

输出:

 1) Timeouts:   124,Allocated: 175,664 bytes
 2) Timeouts:   250,Allocated: 269,720 bytes
 3) Timeouts:   376,Allocated: 362,544 bytes
 4) Timeouts:   502,Allocated: 453,264 bytes
 5) Timeouts:   628,Allocated: 548,080 bytes
 6) Timeouts:   754,Allocated: 638,800 bytes
 7) Timeouts:   880,Allocated: 729,584 bytes
 8) Timeouts: 1,006,Allocated: 820,304 bytes
 9) Timeouts: 1,132,Allocated: 919,216 bytes
10) Timeouts: 1,258,Allocated: 1,011,928 bytes

Try it on Fiddle.

每次操作泄漏大约 800 个字节,这非常令人讨厌。每次在通道中写入新值时都会回收内存,因此对于繁忙的通道,这种设计缺陷应该不是问题。但对于偶尔接收值的渠道来说,这可能是一个亮点。

还有其他异步队列实现可用,它们不会遇到同样的问题。您可以尝试注释 await channel.Reader.ReadAsync(cts.Token); 行并取消注释以下两行中的任何一行。您将看到 BufferBlock<T> 库中的 TPL DataflowAsyncCollection<T> 包中的 Nito.AsyncEx.Coordination 都允许从队列中异步检索超时,而不会出现内存泄漏。

,

我太专注于实际问题的技术细节,我忘记了这个问题几乎是开箱即用的。

从评论看来,实际问题是:

有一个处理器可以批量处理消息。当有足够的消息或时间到时它开始处理,这就是超时取消的地方

这是由 BufferReactiveX.NET 运营商提供的,该运营商由创建 System.Linq.Async 的同一团队构建:

ChannelReader<Message> reader=_channel;

IAsyncEnumerable<IList<Message>> batchItems = reader.ReadAllAsync(token)
                                              .ToObservable()
                                              .Buffer(TimeSpan.FromSeconds(30),5)
                                              .ToAsyncEnumerable();

await foreach(var batch in batchItems.WithCancellation(token))
{
 ....
}

这些调用可以转换为扩展方法,因此问题的类可以有一个 DequeueAsyncBufferAsync 方法,而不是 GetWorkItemsAsync

public IAsyncEnumerable<T[]> BufferAsync(
            TimeSpan timeSpan,CancellationToken cancellationToken = default)
{
    return _channel.Reader.BufferAsync(timeSpan,count,cancellationToken);
}

ToObservableToAsyncEnumerableSystem.Linq.Async 提供,并在 IAsyncEnumerableIObservable(ReactiveX.NET 使用的接口)之间转换。

BufferSystem.Reactive 提供并按计数或周期缓冲项目,甚至允许重叠序列。

虽然 LINQ 和 LINQ to Async 提供了对对象的查询操作符,但 Rx.NET 对基于时间的事件流也做同样的事情。可以随时间聚合、按时间缓冲事件、限制它们等。Buffer 的(非官方)文档页面中的示例展示了如何创建重叠序列(例如滑动窗口)。同一页面显示了如何使用 SampleThrottle 通过仅传播一段时间内的最后一个事件来限制快速事件流。

Rx 使用推送模型(新事件被推送给订阅者),而 IAsyncEnumerable 与 IEnumerable 一样,使用拉模型。 ToAsyncEnumerable() 将缓存项目,直到它们被请求,如果没有人在听,这可能会导致问题。

通过这些方法,人们甚至可以创建扩展方法来缓冲或限制发布者:

    //Returns all items in a period
    public static IAsyncEnumerable<IList<T>> BufferAsync<T>(
        this ChannelReader<T> reader,TimeSpan timeSpan,CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Buffer(timeSpan,count)
            .ToAsyncEnumerable();
    }
        
        
    //Return the latest item in a period
    public static IAsyncEnumerable<T> SampleAsync<T>(
        this ChannelReader<T> reader,CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Sample(timeSpan)
            .ToAsyncEnumerable();
    }