使用webflux和Reactor的正确方法是什么

问题描述

我正在倚靠webflux和反应堆。获得了以下三种测试方法。 “ documentOperations.findById”和“ documentOperations.delete”是两个数据库操作。我知道test1不好,因为两个数据库操作放在一个异步方法中。我的问题是:
test2和test3对系统性能有相同的影响吗?或者换句话说,哪个更好?

private Mono<ServerResponse> test1(ServerRequest request,Contexts contexts) {
    return request.body(bodyDocumentExtractor)
            .flatMap(doc -> {
                Document document = documentOperations.findById(doc.getId());
                documentOperations.delete(document.getId());
                return ServerResponse.noContent().build();
            });
}

private Mono<ServerResponse> test2(ServerRequest request,Contexts contexts) {
    return request.body(bodyDocumentExtractor)
            .flatMap(doc -> {
                return Mono.just(documentOperations.findById(doc.getId()))
                        .flatMap(document -> {
                            documentOperations.delete(document.getId());
                            return ServerResponse.noContent().build();
                        });
            });
}

private Mono<ServerResponse> test3(ServerRequest request,Contexts contexts) {
    return request.body(bodyDocumentExtractor)
            .flatMap(doc -> {
                return Mono.just(documentOperations.findById(doc.getId()));
            }).flatMap(document -> {
                documentOperations.delete(document.getId());
                return ServerResponse.noContent().build();
            });
}

解决方法

基于DocumentOperations是反应性的假设,我希望代码看起来像下面的代码。

private Mono<ServerResponse> test3(ServerRequest request,Contexts contexts) {
    return request.body(bodyDocumentExtractor)
            .flatMap(doc -> documentOperations.findById(doc.getId()))
            .flatMap(document -> documentOperations.delete(document.getId()))
            .then(ServerResponse.noContent().build());
}
,

以上示例都不是好的。您所有的数据库调用都返回具体类型,这意味着它们都阻塞了调用。

// returns the concrete type
// thread does the call,needs to wait until we get the value (document)
Document document = documentOperations.findById("1");

如果未阻止,则返回Mono<T>Flux<T>

// Returns a mono,so we know it's not blocking. 
// We can chain on actions with for example flatMap etc.
Mono<Document> document = documentOperations.findById("1");

如果必须使用阻塞数据库(例如oracle数据库等),则需要将其放置在它自己的整个线程上,这样它就不会阻塞任何主工作线程。可以使用调度程序来完成。因此,在此示例中,当客户端订阅时,它将被放置在单独的线程上。

Mono<Document> document = Mono.fromCallable(() -> documentOperations.findById("1"))
    .subscribeOn(Schedulers.boundedElastic());;

因此,您的示例:

private Mono<ServerResponse> test3(ServerRequest request,Contexts contexts) {
    return request.body(bodyDocumentExtractor)
        .flatMap(doc -> Mono.fromCallable(() -> documentOperations.findById(doc.getId()))
        .flatMap(document -> Mono.fromCallable(() -> documentOperations.delete(document.getId()))
                .then(ServerResponse.noContent().build());
        ).subscribeOn(Schedulers.boundedElastic());
}

Reactor documentation - Wrap blocking calls

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...