订阅后连接到ConnectableFlowable

问题描述

我正在使用RxJava2进行远程数据源的响应处理:我有 import json # Decoding json data = json.loads({"id": 1,"interviewer": "hengtw1","incidenttwg1": {"id": 5,"child_occupation": [6]}}) print(data["incidenttwg1"]["child_occupation"]) # this will print [6] (list) print(data["incidenttwg1"]["child_occupation"][0]) # this will print 6 (list item) 的远程数据源,其中Publisher<T>是数据项,我的使用者是T,现在我想通过接受Publisher<T>将其保存在本地来处理远程源。

这些功能的原型是:

Publisher<T>

我正在使用 public static void main(String[] args) throws Exception { accept(fetch(remote()).flatMapPublisher(x -> x)) .blockingAwait(); System.out.println("done"); } /** * Prototype of remote data source. * @return Publisher from network */ private static Single<? extends Publisher<Integer>> remote() { return Single.just(Flowable.just(1,2,3,4)); } /** * Prototype of consumer: accepts reactive data stream and print to stdout. * @param data Data source */ private static Completable accept(Publisher<Integer> data) { return Flowable.fromPublisher(data).doOnNext(x -> System.out.printf("received %s\n",x)) .ignoreElements(); } /** * Fetch remote resource from network and save locally. * @param remote Publisher of data from network */ private static Single<? extends Publisher<Integer>> fetch(Single<? extends Publisher<Integer>> remote) { return remote.flatMap( pub -> { final ConnectableFlowable<Integer> conPub = Flowable.fromPublisher(pub).publish(); saveLocally(conPub).subscribe( // we don't care if `saveLocally` fails,just continue fetching () -> System.out.println("Saved locally successfully"),err -> System.out.printf("Failed to save locally: %s\n",err) ); return Single.fromCallable( () -> { conPub.connect(); return conPub; } ); } ); } private static Completable saveLocally(Publisher<Integer> source) { // emulating save operation return Flowable.fromPublisher(source) .flatMapCompletable(x -> Completable.fromAction(() -> System.out.printf("save: %d\n",x))); } 在本地保存远程ConnectableFlowable并返回给消费者,但是当我使用Publisher返回发行人{{1} }方法显示:

ConnectableFlowable.connect()

消费者没有收到任何数据吗?

我发现调用main()有一个肮脏的解决方法,但要使此原型正常工作有些延迟:

save: 1
save: 2
save: 3
save: 4
Saved locally successfully

通过这种方式,消费者可以接收所有数据:

connect()

但是,延迟修复代码并不可靠。

原始代码有什么问题,我该如何正确解决?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...