问题描述
两个 Springboot 微服务需要相互交互才能实现群发邮件功能。微服务 1:CollectorService 收集有关客户端和 微服务 2:NotificationService 通过 SMTP 服务发送文档。 NotificationService 无法处理 CollectorService 产生的负载。现在我想了解 NotificationService 如何向 CollectorService 发出信号以减慢速度。我的代码目前如下所示。
CollectorSerivce 调用 NotificationService
final WebClient client = WebClient.builder().baseUrl(BASE_URL.get()).defaultHeaders(httpHeaders -> {
httpHeaders.set("requestId",requestId);
}).build();
Mono<List> responseFlux = client.post().uri(EXTERNAL_API_PATH.get() + "/withattFluxBackPressure")
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(message)).retrieve().bodyToMono(List.class);
NotificationService 处理请求
@PostMapping(value = "/externalNotification/withattFluxBackPressure")
public List<NotificationResponse> sendMailMessageWAttFlux(@RequestBody List<EmailTemplate> mailMessages) throws Exception{
log.info("Processing email sending request async...");
// byte[] byteArrayResource = get file content from S3
List<NotificationResponse> response = new ArrayList<>();
Flux<NotificationResponse> responseFlux = Flux.fromIterable(mailMessages).flatMap(messages -> {
try {
return Flux.just(notificationService.sendMailWAtt(messages,byteArrayResource).map(NotificationResponse::error).orElse(NotificationResponse.success()));
} catch (IOException e) {
log.error("Exception",e);
return Flux.just(new NotificationResponse().error(e.getMessage()));
}
});
responseFlux.subscribe(new BaseSubscriber<NotificationResponse>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
log.info("called hookOnSubscribe......");
subscription.request(1);
}
@Override
protected void hookOnNext(NotificationResponse value) {
log.info("called hookOnNext.......{} ",value);
response.add(value);
request(1);
}
@Override
protected void hookOnComplete() {
log.info("called hookOnComplete.......");
}
@Override
protected void hookOnError(Throwable throwable) {
log.info("called hookOnError.......");
if(throwable instanceof ConnectException) {
log.error("called ConnectException.......");
}
if(throwable instanceof ResourceAccessException) {
log.error("called ResourceAccessException.......");
}
if(throwable instanceof ConnectTimeoutException) {
log.error("called ConnectTimeoutException.......");
}
if(throwable instanceof io.netty.channel.ConnectTimeoutException) {
log.error("called netty ConnectTimeoutException.......");
}
}
});
return response;
}```
1. When NotificationService overloads,how can it signal(backpressure) CollectorService to slow down? (Ideal scenario)
2. Alternatively,NotificationService processes 5 emails then signal/request CollectorService for the next 5.
Thanks for your time!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)