带有断路器的 Kafka 消费者,使用 Resilience4j 的重试模式

问题描述

我需要一些帮助来了解我如何使用 Spring Boot、Kafka、Resilence4J 提出解决方案,以实现来自我的 Kafka 消费者的微服务调用。假设微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者停止获取消息/事件,直到微服务启动并运行。

解决方法

如果您使用的是 Spring Kafka,则可以使用 pause 类的 resumeConcurrentMessageListenerContainer 方法。 您可以将 EventListener 附加到 CircuitBreaker,它侦听状态转换并暂停或恢复事件处理。将 CircuitBreakerRegistry 注入您的 bean:

circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(
                        event -> {
                            switch (event.getStateTransition()) {
                                case CLOSED_TO_OPEN:
                                    container.pause();
                                case OPEN_TO_HALF_OPEN:
                                    container.resume();
                                case HALF_OPEN_TO_CLOSED:
                                    container.resume();
                                case HALF_OPEN_TO_OPEN:
                                    container.pause();
                                case CLOSED_TO_FORCED_OPEN:
                                    container.pause();
                                case FORCED_OPEN_TO_CLOSED:
                                    container.resume();
                                case FORCED_OPEN_TO_HALF_OPEN:
                                    container.resume();
                                default:
                            }
                        }
                );