问题描述
我观察到一个奇怪的现象,有时会出现在我编写的 Rx 查询中,它涉及 CancellationToken
。两个回调注册到同一个 CancellationToken
,一个在查询之外,另一个是查询的一部分。 CancellationToken
的目的是发出查询终止的信号。出现的情况是有时第二个回调卡在执行中间,永远无法完成,阻止调用第一个回调。
以下是重现该问题的最小示例。它不是很小,但我不能进一步减少它。例如,将 Switch
运算符替换为 Merge
会使问题消失。如果 Task.Delay(1000,cts.Token)
抛出的异常被吞下,也会发生同样的情况。
public class Program
{
public static void Main()
{
var cts = new CancellationTokenSource(500);
cts.Token.Register(() => Console.WriteLine("### Token Canceled! ###"));
try
{
Observable
.Timer(TimeSpan.Zero,TimeSpan.FromMilliseconds(1000))
.takeuntil(Observable.Create<Unit>(observer =>
cts.Token.Register(() =>
{
Console.WriteLine("Before observer.OnNext");
observer.OnNext(Unit.Default);
Console.WriteLine("After observer.OnNext");
})))
.Select(_ =>
{
return Observable.StartAsync(async () =>
{
Console.WriteLine("Action starting");
await Task.Delay(1000,cts.Token);
return 1;
});
})
.Switch()
.Wait();
}
catch (Exception ex) { Console.WriteLine("Failed: {0}",ex.Message); }
Thread.Sleep(500);
Console.WriteLine("Finished");
}
}
预期输出:
Action starting
Before observer.OnNext
After observer.OnNext
### Token Canceled! ###
Failed: A task was canceled.
Finished
实际输出(有时):
Action starting
Before observer.OnNext
Failed: A task was canceled.
Finished
Try it on fiddle。在问题出现之前,您可能需要运行该程序 3-4 次。请注意缺少的两个日志条目。似乎调用 observer.OnNext(Unit.Default);
永远不会完成。
我的问题是:有谁知道是什么导致了这个问题?此外,我如何修改查询的 CancellationToken
相关部分,以便它执行其预期目的(终止查询),而不干扰相同 CancellationToken
的其他已注册回调?
.NET 5.0.1 & .NET Framework 4.8、System.Reactive 5.0.0、C# 9
另一个观察结果:如果我修改 Observable.Create
委托使其返回 disposable.Empty
而不是 CancellationTokenRegistration
,如下所示:
.takeuntil(Observable.Create<Unit>(observer =>
{
cts.Token.Register(() =>
{
Console.WriteLine("Before observer.OnNext");
observer.OnNext(default);
Console.WriteLine("After observer.OnNext");
});
return disposable.Empty;
}))
但我不认为忽略 cts.Token.Register
返回的注册是一种解决方法。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)