如何在kafka宕机时停止@InboundChannelAdapter的轮询以防止数据丢失?

问题描述

我正在使用 Spring Cloud 数据流。

@Bean
@InboundChannelAdapter(channel = TbeSource.PR1,poller = @Poller(fixedDelay = "2"))
public supplier<Product> getProductSource(ProductBuilder dataAccess) {

    return ()->dataAccess.getNext();
        
    };

如果 kafka 突然宕机,那么我们如何停止这种轮询行为以防止数据丢失?

当我在测试时,即使 kafka 宕机,数据也不断从数据库中读取,并不断尝试将记录发送到 kafka?

预期性能是在 kafka 宕机后停止数据轮询..

有什么可能的方法来实现吗?

解决方法

@Poller@InboundChannelAdapter 可以配置为 errorChannel

/**
 * @return The the bean name of default error channel
 * for the underlying {@code MessagePublishingErrorHandler}.
 * @since 4.3.3
 */
String errorChannel() default "";

因此,每当在该 TbeSource.PR1 通道上的流下游发生异常时,它都会被传递到提供的错误通道以用于其上的某些错误流。

在那里,您可以按照逻辑停止为该 SourcePollingChannelAdapter@InboundChannelAdapter 组合创建的 Supplier。在这种情况下,bean id 是这样的:[CONFIGURATION_CLASS_BEAN_NAME.getProductSource.inboundChannelAdapter]。请参阅此处了解更多信息:https://docs.spring.io/spring-integration/reference/html/configuration.html#annotations_on_beans。正如它所说,您也可以使用 @EndpointId 来简化依赖注入例程的生活。

确保重新抛出异常,让数据库事务回滚以避免数据丢失!