问题描述
有谁知道合并操作符是如何实现的?我惊讶地发现 Merge 操作符可以正确地合并冷 observables:
var odd = new int[] { 1,3 }.ToObservable().Trace("odd");
var even = new int[] { 2,4 }.ToObservable().Trace("even");
odd.Merge(even).Dump("Merged");
输出:
odd1: Subscribe()
even1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: OnNext(2)
Merged on 1 -->2
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(4)
Merged on 1 -->4
odd1: OnCompleted()
odd1: dispose()
even1: OnCompleted()
even1: dispose()
Merged on 1 completed
我很快宣布 ToObservable() 调用正在使用 Scheduler.CurrentThread 来完成它的工作,从而允许“协作调度”发生。我使用对 Scheduler.CurrentThread.Schedule() 的递归调用为奇数构建了自己的可观察实现。这允许合并操作符的行为就像它对 new int[] { 1,3,5}.ToObservable() observable 所做的那样。到现在为止还挺好。我现在试图弄清楚合并运算符如何在幕后工作以允许协作调度。我已经编写了自己的合并运算符版本,以帮助我理解。我尝试将订阅调用安排到以两种不同方式合并的 observable 中,但都没有复制原始合并行为。
public class Mergedobservable<T> : IObservable<T>
{
private readonly IObservable<T> mSource;
private readonly IObservable<T> mSecond;
public Idisposable Subscribe(IObserver<T> observer)
{
Scheduler.CurrentThread.Schedule(() =>
{
mSource.Subscribe(internalObserver);
Scheduler.CurrentThread.Schedule(() =>
{
mSecond.Subscribe(internalObserver);
});
});
}
}
odd1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: Subscribe()
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(2)
Merged on 1 -->2
odd1: OnCompleted()
odd1: dispose()
even1: OnNext(4)
Merged on 1 -->4
even1: OnCompleted()
Merged on 1 completed
even1: dispose()
或者当我使用 scheduleAsync 时:
Scheduler.CurrentThread.ScheduleAsync(async (s,t) =>
{
retVal.Add(mSource.Subscribe(internalObserver));
await s.Yield();
retVal.Add(mSecond.Subscribe(internalObserver));
await s.Yield();
return disposable.Empty;
});
odd1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: Subscribe()
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(2)
Merged on 1 -->2
odd1: OnCompleted()
odd1: dispose()
even1: OnNext(4)
Merged on 1 -->4
even1: OnCompleted()
Merged on 1 completed
even1: dispose()
在这两种情况下,对第一个 subscribe 的调用都会在对下一个 observable 的 Subscribe 调用之前产生对 OnNext 的调用。我错过了什么?非常感谢任何帮助。
解决方法
我认为写这个问题有助于我更好地理解这个问题。我不需要递归调度或调用 yield。我只需要安排两个电话订阅:
Scheduler.CurrentThread.Schedule(() =>
{
mSource.Subscribe(internalObserver);
mSecond.Subscribe(internalObserver);
});