问题描述
我正在开发一个使用反应堆库与Google pubsub连接的应用程序。所以我有大量的信息。无论发生什么情况,我都希望它始终从队列中使用:这意味着要处理所有错误,以免终止通量。我正在考虑(极不可能)与pubsub的连接丢失或可能导致刚创建的Flux发出错误信号的事件。我想出了这个解决方案:
private final PubSubReactiveFactory pubSubReactiveFactory;
private final String requestSubscription;
private final Long requestPollTime;
private final Flux<AckNowledgeablePubsubMessage> requestFlux;
@Autowired
public FluxContainer(/* Field args...*/) {
// init stuff...
this.requestFlux = initRequestFlux();
}
private Flux<AckNowledgeablePubsubMessage> initRequestFlux() {
return pubSubReactiveFactory.poll(requestSubscription,requestPollTime);
.doOnError(e -> log.error("Fatal error: Could not retrieve message from queue. Resetting flux",e))
.onErrorResume(e -> initRequestFlux());
}
@EventListener(ApplicationReadyEvent.class)
public void configureFluxAndSubscribe() {
log.info("Setting up requestFlux...");
this.requestFlux
.doOnNext(AckNowledgeablePubsubMessage::ack)
// ...many more concatenated calls handling flux
}
这有意义吗?我担心内存分配(我依靠gc清理东西)。欢迎任何评论。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)