从CompletionStage返回Single的正确方法

问题描述

我正在使用RxJava2,Micronaut和Cassandra处理反应流。我是rxjava的新手,不确定以最佳异步方式返回List Person的正确方法是什么?

数据来自Cassandra Dao界面

public interface PersonDAO {    
    @Query("SELECT * FROM cass_drop.person;")
    CompletionStage<MappedAsyncPagingIterable<Person>> getAll(); 
}

被注入到micronaut控制器中

   return Single.just(personDAO.getAll().toCompletableFuture().get().currentPage())
            .subscribeOn(Schedulers.io())
            .map(people -> HttpResponse.ok(people));

OR

return Single.just(HttpResponse.ok())
        .subscribeOn(Schedulers.io())
        .map(it -> it.body(personDAO.getAll().toCompletableFuture().get().currentPage()));

或切换到RxJava3

    return Single.fromCompletionStage(personDAO.getAll())
            .map(page -> HttpResponse.ok(page.currentPage()))
            .onErrorReturn(throwable -> HttpResponse.ok(Collections.emptyList()));

解决方法

不是RxJava和Cassandra的专业人士:

在第一个和第二个示例中,您正在阻塞用CompletionStage执行get的线程,即使您在IO线程中执行此操作,我也不建议这样做。

您还使用了Single,它只能发出一个值或一个错误。由于您想返回List,因此我建议至少要使用Observable

第三点,来自Cassandra的结果是分页的,我不知道这是否是故意的,但是您只列出了第一页,而错过了其他页面。

我会尝试下面的解决方案,我一直使用IO线程(该操作在IO中可能会花费很多),然后遍历Cassandra获取的页面:


    /* the main method of your controller */
    @Get()
    public Observable<Person> listPersons() {
        return next(personDAO.getAll()).subscribeOn(Schedulers.io());
    }

    private Observable<Person> next(CompletionStage<MappedAsyncPagingIterable<Person>> pageStage) {
        return Single.fromFuture(pageStage.toCompletableFuture())
                .flatMapObservable(personsPage -> {
                    var o = Observable.fromIterable(personsPage.currentPage());
                    if (!personsPage.hasMorePages()) {
                        return o;
                    }
                    return o.concatWith(next(personsPage.fetchNextPage()));
                });
    }
,

如果您打算使用 reactor 而不是 RxJava,那么您可以尝试一下 cassandra-java-driver-reactive-mapper

语法相当简单,仅在编译时有效。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...