在流方法链中添加SubscribeOn和ObserveOn之后,未触发

问题描述

我正在使用rxjava3,但不太了解为什么在添加ObserveOnSubscribeOn之后没有在流中调用方法。

这是示例Java代码:

package mytestapp.error;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;

/**
 * Hello world!
 *
 */
public class App {
    public static void main(String[] args) {
        System.out.println("Hello World!");
        String apiUrl = "myApiUrl";
        try {
            App app = new App();
            app.syncNow(apiUrl);
        } catch (JSONException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private void syncNow(String apiUrl) throws JSONException,IOException {
        createOrAlterTable().observeOn(Schedulers.newThread())
                .switchMap(d -> menuTableRecords(apiUrl))
                .observeOn(Schedulers.newThread()).subscribe(res -> {
                    System.out.println(res);

                },onError -> {
                    System.out.println(onError);
                },() -> {
                    System.out.println("Completed!!");
                });
        ;
    }

    private @NonNull Observable<Object> createOrAlterTable()
            throws IOException,JSONException {
        // Read table from backend
        // Read last sync file
        // get user data
        return Observable.zip(readTableFromBackend(),readLastSyncFile(),getUserData(),(s1,s2,s3) -> readTableFromBackendZipperFun(s1,s3)).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io()).map(
                d -> d);
    }

    private @NonNull Observable<String> readTableFromBackend()
            throws JSONException,IOException {
        return Observable.fromArray("testing");
    }

    private @NonNull Observable<JSONObject> readLastSyncFile()
            throws JSONException {
        return Observable.fromArray(new JSONObject());
    }

    private @NonNull Observable<Boolean> getUserData() throws JSONException {
        return Observable.fromArray(true);
    }

    private JSONArray readTableFromBackendZipperFun(String sqlliteDDL,JSONObject lastFV,boolean userDataFlag) throws JSONException {
        System.out.println("zip ops");
        return new JSONArray();
    }

    private @NonNull Observable<String> menuTableRecords(String apiUrl)
            throws JSONException,IOException {
        
        return Observable.fromArray("MENU_TABLE_RECORDS");
    }
}

我想在单独的线程上执行每个方法,并在不同的线程上执行订阅。

造成问题的原因是什么?如何解决上述情况?

谢谢。

解决方法

看起来您不必等待main方法中的异步活动完成,而您的应用程序只是退出了。推荐读物:https://github.com/ReactiveX/RxJava#simple-background-computation

在这种特殊情况下,您可以使用blockingSubscribe而不是subscribe来解决此问题。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...