问题描述
我正在使用 Spring Cloud 数据流。
@Bean
@InboundChannelAdapter(channel = TbeSource.PR1,poller = @Poller(fixedDelay = "2"))
public supplier<Product> getProductSource(ProductBuilder dataAccess) {
return ()->dataAccess.getNext();
};
如果 kafka 突然宕机,那么我们如何停止这种轮询行为以防止数据丢失?
当我在测试时,即使 kafka 宕机,数据也不断从数据库中读取,并不断尝试将记录发送到 kafka?
预期性能是在 kafka 宕机后停止数据轮询..
有什么可能的方法来实现吗?
解决方法
@Poller
的 @InboundChannelAdapter
可以配置为 errorChannel
:
/**
* @return The the bean name of default error channel
* for the underlying {@code MessagePublishingErrorHandler}.
* @since 4.3.3
*/
String errorChannel() default "";
因此,每当在该 TbeSource.PR1
通道上的流下游发生异常时,它都会被传递到提供的错误通道以用于其上的某些错误流。
在那里,您可以按照逻辑停止为该 SourcePollingChannelAdapter
和 @InboundChannelAdapter
组合创建的 Supplier
。在这种情况下,bean id 是这样的:[CONFIGURATION_CLASS_BEAN_NAME.getProductSource.inboundChannelAdapter]
。请参阅此处了解更多信息:https://docs.spring.io/spring-integration/reference/html/configuration.html#annotations_on_beans。正如它所说,您也可以使用 @EndpointId
来简化依赖注入例程的生活。
确保重新抛出异常,让数据库事务回滚以避免数据丢失!