Flux Reactive Microservice - 一个微服务向另一个微服务发出有关背压的信号

问题描述

两个 Springboot 微服务需要相互交互才能实现群发邮件功能。微服务 1:CollectorService 收集有关客户端和 微服务 2:NotificationService 通过 SMTP 服务发送文档。 NotificationService 无法处理 CollectorService 产生的负载。现在我想了解 NotificationService 如何向 CollectorService 发出信号以减慢速度。我的代码目前如下所示。

CollectorSerivce 调用 NotificationService

final WebClient client = WebClient.builder().baseUrl(BASE_URL.get()).defaultHeaders(httpHeaders -> {
            httpHeaders.set("requestId",requestId);
        }).build();

            Mono<List> responseFlux = client.post().uri(EXTERNAL_API_PATH.get() + "/withattFluxBackPressure")
                    .contentType(MediaType.APPLICATION_JSON)
                    .body(BodyInserters.fromValue(message)).retrieve().bodyToMono(List.class);

NotificationService 处理请求

 @PostMapping(value = "/externalNotification/withattFluxBackPressure")
    public List<NotificationResponse> sendMailMessageWAttFlux(@RequestBody List<EmailTemplate> mailMessages) throws Exception{
        log.info("Processing email sending request async...");

       
//        byte[] byteArrayResource = get file content from S3
        List<NotificationResponse> response = new ArrayList<>();

        Flux<NotificationResponse> responseFlux = Flux.fromIterable(mailMessages).flatMap(messages -> {
            try {
                return Flux.just(notificationService.sendMailWAtt(messages,byteArrayResource).map(NotificationResponse::error).orElse(NotificationResponse.success()));
            } catch (IOException e) {
                log.error("Exception",e);
                return Flux.just(new NotificationResponse().error(e.getMessage()));
            }
        });

        responseFlux.subscribe(new BaseSubscriber<NotificationResponse>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        log.info("called hookOnSubscribe......");
                        subscription.request(1);
                    }

                    @Override
                    protected void hookOnNext(NotificationResponse value) {
                        log.info("called hookOnNext.......{} ",value);
                        response.add(value);
                        request(1);

                    }

                    @Override
                    protected void hookOnComplete() {
                        log.info("called hookOnComplete.......");
                    }

                    @Override
                    protected void hookOnError(Throwable throwable) {
                        log.info("called hookOnError.......");
                        if(throwable instanceof ConnectException) {
                            log.error("called ConnectException.......");
                        }
                        if(throwable instanceof ResourceAccessException) {
                            log.error("called ResourceAccessException.......");
                        }
                        if(throwable instanceof ConnectTimeoutException) {
                            log.error("called ConnectTimeoutException.......");
                        }
                        if(throwable instanceof io.netty.channel.ConnectTimeoutException) {
                            log.error("called netty ConnectTimeoutException.......");
                        }
                    }
                });

        return response;
}```

 1. When NotificationService overloads,how can it signal(backpressure) CollectorService to slow down? (Ideal scenario)
 2. Alternatively,NotificationService processes 5 emails then signal/request CollectorService for the next 5.

Thanks for your time!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)