@Async API端点Spring Webflux

问题描述

我需要编写Spring Webflux端点(路由器功能)以将邮件发送到邮件收件人列表。 UI将选择邮件收件人列表,并将该列表发送到我将要编写的API。我希望以某种方式实施端点,以便在收到请求后立即将响应发送到UI,表明正在发送电子邮件。发送响应后,我应该继续异步进行邮件发送工作。我不能像在Spring MVC中那样使用@async注释,因为它是反应式世界中的反模式。

自从我使用spring webflux开发API以来,如何发送回复。

我的代码中有一个下面的结构。

Router.java

@Bean
public RouterFunction<ServerResponse> sendEmail() {
 return route(POST("/email").and(accept(APPLICATION_JSON)),handler::sendEmail);
}

Handler.java

@Autowired
EmailService emailService;

public Mono<ServerResponse> sendEmail(ServerRequest request) {
    Mono<PojoA> pojoAMono = request.bodyToMono(PojoA.class);
    return pojoAMono.flatMap(pojoA -> {
       return emailService.sendEmail(pojoA).flatMap(mailSent -> {
         return  ServerResponse
        .status(HttpStatus.OK)
        .contentType(MediaType.APPLICATION_JSON)
        .body("Mails are being sent",String.class));
       });
    });
    
}

解决方法

您可以直接将响应返回给呼叫者,并在该流完成后运行作为副作用的电子邮件发送。这可以通过在流完成后执行的doFinally来完成。

因此您的代码可能如下所示:

public Mono<ServerResponse> sendEmail(ServerRequest request) {
    return request.bodyToMono(PojoA.class)
            .map(this::sendEmailSideEffect)
            .flatMap(pojoA -> ServerResponse
                    .status(HttpStatus.OK)
                    .contentType(MediaType.APPLICATION_JSON)
                    .body("Mails are being sent",String.class));
}

private Mono<PojoA> sendEmailSideEffect(PojoA pojoA) {
    return Mono.just(pojoA)
            .doFinally(signalType -> emailService.sendEmails(pojoA));
}
,

您应该构建响应,然后处理数据(发送电子邮件)。

类似这样的东西:

@Bean
public RouterFunction<ServerResponse> sendEmail() {
    return route(POST("/test").and(accept(APPLICATION_JSON)),this::someMethod);
}

Mono<ServerResponse> someMethod(ServerRequest serverRequest) {
    return ServerResponse.ok().build()
            .doOnNext(r -> Mono.just("data") //doing some process like send email
                    .delayElement(Duration.ofSeconds(2))
                    .subscribeOn(Schedulers.parallel())
                    .log()
                    .subscribe());
}

出于测试目的,我延迟了您发送响应后看到的数据处理。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...