android – RxJava主题在不正确的调度程序上发出

我有一个以下课程,我作为一个单身人士持有:

public class SessionStore {
    Subject<Session,Session> subject;

    public SessionStore() {
       subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
    }

    public void set(Session session) {
        subject.onNext(session);
    }

    public Observable<UserSession> observe() {
        return subject.distinctUntilChanged();
    }
}

在活动中,我观察会话并对每次更改执行网络操作:

private Subscription init() {
    return sessionStore
            .observe()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .flatMap(new Func1<Session,Observable<Object>>() {
                @Override
                public Observable<Object> call(Session session) {
                    return retrofitService.getAThing();
                }
            })
            .subscribe(...);
}

当我订阅会话存储时,主题立即发布在io()上,因为它是BehavIoUrSubject,订阅者在mainThread()上执行.

当我在订阅它时调用sessionStore.set(new AnotherSession())时会出现问题. IMO应该在io()调度程序上执行init()中定义的流.但是,相反的是,流在调用subject.onNext()的同一线程上执行.因为我在flatMap()中进行网络操作而导致networkonmainthreadException.

我是否理解主题错了?我这样滥用他们吗?那么适当的解决方案是什么?

我也尝试用observe()方法中的Observable.fromEmitter()替换整个主题方法,但令人惊讶的是输出是一样的.

解决方法

请查看“ Reactive Programming with RxJava”一书中的以下部分

认情况下,在Subject上调用onNext()会直接传播到所有Observer的onNext()回调方法.这些方法具有相同的名称并不奇怪.在某种程度上,在Subject上调用onNext()会在每个Subscriber上间接调用onNext().

让我们回顾一下:
如果从Thread-1调用Subject上的onNext,它将从Thread-1调用onNext到订阅者. onSubscribe将被删除.

首先要做的事情是:
订阅发生在哪个线程上:

retrofitService.getAThing()

我会猜测,并说它是调用线程.这将是observeOn中描述的线程,它是Android-UI-Loop.

observeOn下的每个值都将从调度程序指定的Thread-a转移到Thread-b. UI-Loop上的observeOn应该在订阅之前发生.订阅中将接收的每个值都将位于UI-Loop上,这不会阻止UI线程或以异常结束.

Pease看一下示例代码输出

class SessionStore {
    private Subject<String,String> subject;

    public SessionStore() {
        subject = BehaviorSubject.create("wurst").toSerialized();
    }

    public void set(String session) {
        subject.onNext(session);
    }

    public Observable<String> observe() {
        return subject
                .asObservable()
                .doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
                .distinctUntilChanged();
    }
}

@Test
public void name() throws Exception {
    // init
    SessionStore sessionStore = new SessionStore();

    TestSubscriber testSubscriber = new TestSubscriber();
    Subscription subscribe = sessionStore
            .observe()
            .flatMap(s -> {
                return Observable.fromCallable(() -> {
                    System.out.println("flatMap Thread:: " + Thread.currentThread());
                    return s;
                }).subscribeOn(Schedulers.io());
            })
            .doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
            .observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
            .subscribe(testSubscriber); // Do UI-Stuff in subscribe

    new Thread(() -> {
        System.out.println("set on Thread:: " + Thread.currentThread());
        sessionStore.set("123");
    }).start();

    new Thread(() -> {
        System.out.println("set on Thread:: " + Thread.currentThread());
        sessionStore.set("345");
    }).start();

    boolean b = testSubscriber.awaitValueCount(3,3_000,TimeUnit.MILLISECONDS);

    Assert.assertTrue(b);
}

输出::

Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,main]
After flatMap Thread:: Thread[RxIoScheduler-2,main]
set on Thread:: Thread[Thread-1,main]
set on Thread:: Thread[Thread-0,main]
Receiving value on Thread:: Thread[Thread-1,main]

相关文章

Android性能优化——之控件的优化 前面讲了图像的优化,接下...
前言 上一篇已经讲了如何实现textView中粗字体效果,里面主要...
最近项目重构,涉及到了数据库和文件下载,发现GreenDao这个...
WebView加载页面的两种方式 一、加载网络页面 加载网络页面,...
给APP全局设置字体主要分为两个方面来介绍 一、给原生界面设...
前言 最近UI大牛出了一版新的效果图,按照IOS的效果做的,页...