如何实现你自己的合并操作符来正确合并冷的 observables

问题描述

有谁知道合并操作符是如何实现的?我惊讶地发现 Merge 操作符可以正确地合并冷 observables:

         var odd = new int[] { 1,3 }.ToObservable().Trace("odd");
         var even = new int[] { 2,4 }.ToObservable().Trace("even");

         odd.Merge(even).Dump("Merged");

输出

odd1: Subscribe()
even1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: OnNext(2)
Merged on 1 -->2
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(4)
Merged on 1 -->4
odd1: OnCompleted()
odd1: dispose()
even1: OnCompleted()
even1: dispose()
Merged on 1   completed

我很快宣布 ToObservable() 调用正在使用 Scheduler.CurrentThread 来完成它的工作,从而允许“协作调度”发生。我使用对 Scheduler.CurrentThread.Schedule() 的递归调用为奇数构建了自己的可观察实现。这允许合并操作符的行为就像它对 new int[] { 1,3,5}.ToObservable() observable 所做的那样。到现在为止还挺好。我现在试图弄清楚合并运算符如何在幕后工作以允许协作调度。我已经编写了自己的合并运算符版本,以帮助我理解。我尝试将订阅调用安排到以两种不同方式合并的 observable 中,但都没有复制原始合并行为。

   public class Mergedobservable<T> : IObservable<T>
   {
      private readonly IObservable<T> mSource;
      private readonly IObservable<T> mSecond;

      public Idisposable Subscribe(IObserver<T> observer)
      {
         Scheduler.CurrentThread.Schedule(() =>
        {
           mSource.Subscribe(internalObserver);
           Scheduler.CurrentThread.Schedule(() =>
           {
              mSecond.Subscribe(internalObserver);
           });
        });
      }
   }

odd1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: Subscribe()
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(2)
Merged on 1 -->2
odd1: OnCompleted()
odd1: dispose()
even1: OnNext(4)
Merged on 1 -->4
even1: OnCompleted()
Merged on 1   completed
even1: dispose()

或者当我使用 scheduleAsync 时:

     Scheduler.CurrentThread.ScheduleAsync(async (s,t) =>
     {
        retVal.Add(mSource.Subscribe(internalObserver));
        await s.Yield();
        retVal.Add(mSecond.Subscribe(internalObserver));
        await s.Yield();
        return disposable.Empty;
     });

odd1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: Subscribe()
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(2)
Merged on 1 -->2
odd1: OnCompleted()
odd1: dispose()
even1: OnNext(4)
Merged on 1 -->4
even1: OnCompleted()
Merged on 1   completed
even1: dispose()

在这两种情况下,对第一个 subscribe 的调用都会在对下一个 observable 的 Subscribe 调用之前产生对 OnNext 的调用。我错过了什么?非常感谢任何帮助。

解决方法

我认为写这个问题有助于我更好地理解这个问题。我不需要递归调度或调用 yield。我只需要安排两个电话订阅:

      Scheduler.CurrentThread.Schedule(() =>
     {
        mSource.Subscribe(internalObserver);
        mSecond.Subscribe(internalObserver);
     });

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...