奇怪的 Rx+CancellationToken 问题:有时注册的回调没有完成

问题描述

我观察到一个奇怪的现象,有时会出现在我编写的 Rx 查询中,它涉及 CancellationToken。两个回调注册到同一个 CancellationToken一个查询之外,另一个查询的一部分。 CancellationToken 的目的是发出查询终止的信号。出现的情况是有时第二个回调卡在执行中间,永远无法完成,阻止调用一个回调。

以下是重现该问题的最小示例。它不是很小,但我不能进一步减少它。例如,将 Switch 运算符替换为 Merge 会使问题消失。如果 Task.Delay(1000,cts.Token) 抛出的异常被吞下,也会发生同样的情况。

public class Program
{
    public static void Main()
    {
        var cts = new CancellationTokenSource(500);
        cts.Token.Register(() => Console.WriteLine("### Token Canceled! ###"));
        try
        {
            Observable
                .Timer(TimeSpan.Zero,TimeSpan.FromMilliseconds(1000))
                .takeuntil(Observable.Create<Unit>(observer =>
                    cts.Token.Register(() =>
                    {
                        Console.WriteLine("Before observer.OnNext");
                        observer.OnNext(Unit.Default);
                        Console.WriteLine("After observer.OnNext");
                    })))
                .Select(_ =>
                {
                    return Observable.StartAsync(async () =>
                    {
                        Console.WriteLine("Action starting");
                        await Task.Delay(1000,cts.Token);
                        return 1;
                    });
                })
                .Switch()
                .Wait();
        }
        catch (Exception ex) { Console.WriteLine("Failed: {0}",ex.Message); }
        Thread.Sleep(500);
        Console.WriteLine("Finished");
    }
}

预期输出

Action starting
Before observer.OnNext
After observer.OnNext
### Token Canceled! ###
Failed: A task was canceled.
Finished

实际输出(有时):

Action starting
Before observer.OnNext
Failed: A task was canceled.
Finished

Try it on fiddle。在问题出现之前,您可能需要运行该程序 3-4 次。请注意缺少的两个日志条目。似乎调用 observer.OnNext(Unit.Default); 永远不会完成。

我的问题是:有谁知道是什么导致了这个问题?此外,我如何修改查询CancellationToken 相关部分,以便它执行其预期目的(终止查询),而不干扰相同 CancellationToken 的其他已注册回调?

.NET 5.0.1 & .NET Framework 4.8、System.Reactive 5.0.0、C# 9


一个观察结果:如果我修改 Observable.Create 委托使其返回 disposable.Empty 而不是 CancellationTokenRegistration,如下所示:

.takeuntil(Observable.Create<Unit>(observer =>
{
    cts.Token.Register(() =>
    {
        Console.WriteLine("Before observer.OnNext");
        observer.OnNext(default);
        Console.WriteLine("After observer.OnNext");
    });
    return disposable.Empty;
}))

但我不认为忽略 cts.Token.Register 返回的注册是一种解决方法

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...