问题描述
使用“ observeOn(Scheduler)”,Rxjava可以观察到的似乎是,当项目从不同线程发出时,somtimes会丢失某些项目。
这是代码
val subject = PublishSubject.create<String>()
subject
.observeOn(Schedulers.io())
.subscribe { x: String? -> println(x) }
val t1 = Thread { subject.onNext("1") }
val t2 = Thread { subject.onNext("2") }
t1.start()
t2.start()
我希望控制台必须始终像下面这样打印
1
2
但有时结果仅仅是
1
或
2
有什么理由吗?
此外)如果删除“ observeOn(Schedulers.io())”,结果与我的期望相同。
解决方法
PublishSubject的onXXX
方法不是线程安全的,因此您必须以某种方式序列化对其的访问。最简单的方法是应用toSerialized()
:
调用onNext(Object),onError(Throwable)和onComplete()需要进行序列化(从同一线程调用,或者通过外部序列化方法从不同线程非重叠调用)。适用于所有主题的Subject.toSerialized()方法提供了这样的序列化,并且还防止了重入(即,当使用此主题的下游观察者也希望递归调用此主题的onNext(Object)时。)
val subject = PublishSubject.create<String>().toSerialized()