RxJava2-间隔和调度程序

问题描述

比方说,我有一个间隔,并且给了它一个CalculationScheduler。像这样:

import matplotlib.pyplot as plt


x1,y1 = 0.3,0.3
x2,y2 = 0.6,0.6

fig,axs = plt.subplots(5,5)

angle = 0

for ax in axs.flat:
    ax.plot([x1,x2],[y1,y2],".")
    ax.annotate("",xy=(x1,y1),xycoords='data',xytext=(x2,y2),textcoords='data',arrowprops=dict(arrowstyle="-",color="0.5",connectionstyle=f"bar,angle={angle},fraction=-0.3",),)
    ax.set_title(angle)
    angle += 15
    ax.set(xlim=(0,1),ylim=(0,aspect=1)

fig.tight_layout(pad=0.2)

plt.show()

然后,平面图{...}中发生的所有事情也都将安排在计算线程上吗?

在Observable.interval(长initialDelay,长周期,TimeUnit单位,调度程序调度程序)的源中,它说:

Observable
    .interval(0,1,TimeUnit.SECONDS,computationScheduler)
    .flatMap { ... }

作为RxJava的初学者,我很难理解此注释。我知道间隔计时器/等待逻辑发生在计算线程上。但是,关于发射项目的最后一部分是否也意味着发射的项目将在同一线程上被消耗?还是为此需要一个observeOn?像这样:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

如果我希望在计算线程上处理发射,是否需要observonOn?

解决方法

这很容易验证:只需打印当前线程以查看操作员在哪个线程上执行:

Observable.just(1,2,3,4,5,6,7,8,9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这将始终打印:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

按顺序处理,因为所有操作都在单个线程中-> main

observeOn将更改下游执行线程:

Observable.just(1,9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

这次的结果对于每次执行都会有所不同,但是flatmapsubscribe将在不同的线程中处理:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

interval将充当observeOn并更改下游执行线程(调度程序):

Observable.interval(0,1,TimeUnit.SECONDS,Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这次执行是在计算调度程序的一个线程内按顺序执行的:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

interval默认情况下将使用计算调度程序,您无需将其作为参数传递,并且不需要observeOn