ChannelReader.ReadAllAsync(CancellationToken) 实际上并未在迭代过程中取消

问题描述

我一直在研究一项功能,该功能可将通道中的耗时工作排队,并在那里使用例如迭代通道 await foreach(var item in channel.Reader.ReadAllAsync(cancellationToken)) {...}

我期望当通过 cancellationToken 请求取消时,ReadAllAsync 会在取消之后的第一次迭代中抛出。

在我看来,情况并非如此。循环继续,直到处理完所有项目,然后然后抛出一个 OperationCanceledException

至少可以这样说,这看起来有点奇怪。从 ChannelReadergithub repo 可以看出取消令牌标有 [EnumeratorCancellation] 属性,因此应该将其传递给围绕 yield return item; 生成的状态机(请如果我错了,请纠正我)。

我的问题是,这是 ReadAllAsync(CancellationToken) 的(有点)正常行为,还是我遗漏了什么?

以下是演示问题的简单测试代码 (try it on dotnetfiddle):

var channel = Channel.CreateUnbounded<int>();
for (int i = 1; i <= 10; i++) channel.Writer.TryWrite(i);
int itemsRead = 0;
var cts = new CancellationTokenSource();
try
{
    await foreach (var i in channel.Reader.ReadAllAsync(cts.Token))
    {
        Console.WriteLine($"Read item: {i}. Requested cancellation: " +
            $"{cts.Token.IsCancellationRequested}");

        if (++itemsRead > 4 && !cts.IsCancellationRequested)
        {
            Console.WriteLine("Cancelling...");
            cts.Cancel();
        }
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"Operation cancelled. Items read: {itemsRead}");
}

这是上面的输出。请注意项目提取在中途取消后如何继续:

Read item: 1. Requested cancellation: False
Read item: 2. Requested cancellation: False
Read item: 3. Requested cancellation: False
Read item: 4. Requested cancellation: False
Read item: 5. Requested cancellation: False
Cancelling...
Read item: 6. Requested cancellation: True
Read item: 7. Requested cancellation: True
Read item: 8. Requested cancellation: True
Read item: 9. Requested cancellation: True
Read item: 10. Requested cancellation: True
Operation cancelled. Items read: 10

解决方法

这种行为是设计使然。我正在从相关的 Stephen Toub's response 复制粘贴 GitHub issue

我想问一下这种行为是否是有意为之。

是的。已经有可供立即读取的数据,因此实际上没有什么可以取消的。迭代器的实现只是坐在this tight loop

while (TryRead(out T? item)) 
{ 
   yield return item; 
} 

只要数据立即可用。一旦没有,它就会逃逸到外循环,该循环将检查取消。

也就是说,它可以改变。我对选择取消是否更可取没有强烈的看法;我希望这取决于用例。