Spring Boot Kafka - Kafka 指标在 /actuator/prometheus 中不可用

问题描述

我想监控 Kafka 指标,但不幸的是,/actuator/prometheus 端点下不存在与 Kafka 相关的任何内容。我的设置中是否缺少任何内容

应用依赖:Kotlin 1.4.31、Spring Boot 2.3.9、Spring Kafka 2.6.7、Reactor Kafka 1.2.5、Kafka Clients 2.5.1

应用配置

    management:   
      server:
        port: 8081   
      endpoints:
        web:
          exposure:
            include: health,info,metrics,prometheus
    
    spring:
      jmx:
        enabled: true
      kafka:
        bootstrap-servers: ...
        consumer:
          group-id: my-service
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          ack-mode: manual
        ssl:
          key-store-location: ...
          key-store-password: ...
        security:
          protocol: SSL

我的接收器看起来像

    @Bean
    fun someEventReceiver(): SomeEventReceiver =
        KafkaReceiver.create(
            ReceiverOptions.create<String,SomeEvent>(kafkaProperties.buildConsumerProperties())
                .withValueDeserializer(SomeEvendeserializer())
                .subscription(listof(serviceProperties.kafka.topics.someevent))
        )

和听众

    @EventListener(ApplicationStartedEvent::class)
    fun onSomeEvent() {
        someEventReceiver
            .receive()
            .groupBy { it.receiverOffset().topicPartition() }
            .publishOn(Schedulers.boundedElastic())
            .flatMap { someEvent ->
                someEvent
                    .publishOn(Schedulers.boundedElastic())
                    .delayUntil(::handleEvent)
                    .doOnNext { it.receiverOffset().ackNowledge() }
                    .retrywhen(Retry.backoff(10,Duration.ofMillis(100)))
            }
            .retrywhen(Retry.indefinitely())
            .subscribe()
    }

解决方法

spring-kafka 不同,reactor-kafka 目前没有任何 Micrometer 集成。

如果您在类路径上也有 spring-kafka,您可以利用它的 MicrometerConsumerListenerKafkaClientMetrics 绑定到仪表注册表(或者您可以自己进行注册绑定)。

以下是使用 Spring 侦听器的示例:

@SpringBootApplication
public class So66706766Application {

    public static void main(String[] args) {
        SpringApplication.run(So66706766Application.class,args);
    }

    @Bean
    ApplicationRunner runner(MicrometerConsumerListener<String,String> consumerListener) {
        return args -> {
            ReceiverOptions<String,String> ro = ReceiverOptions.<String,String>create(
                        Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092",ConsumerConfig.GROUP_ID_CONFIG,"so66706766"))
                    .withKeyDeserializer(new StringDeserializer())
                    .withValueDeserializer(new StringDeserializer())
                    .subscription(Collections.singletonList("so66706766"));
            KafkaReceiver<String,String> receiver = KafkaReceiver.create(ro);
            receiver.receive()
                    .doOnNext(rec -> {
                        System.out.println(rec.value());
                        rec.receiverOffset().acknowledge();
                    })
                    .subscribe();
            receiver.doOnConsumer(consumer -> {
                consumerListener.consumerAdded("myConsumer",consumer);
                return Mono.empty();
            }).subscribe();
        };
    }

    @Bean
    MicrometerConsumerListener<String,String> consumerListener(MeterRegistry registry) {
        return new MicrometerConsumerListener<>(registry);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so66706766").partitions(1).replicas(1).build();
    }

}

# HELP kafka_consumer_successful_authentication_total The total number of connections with successful authentication
# TYPE kafka_consumer_successful_authentication_total counter
kafka_consumer_successful_authentication_total{client_id="consumer-so66706766-1",kafka_version="2.6.0",spring_id="myConsumer",} 0.0
# HELP jvm_gc_live_data_size_bytes Size of long-lived heap memory pool after reclamation
# TYPE jvm_gc_live_data_size_bytes gauge
jvm_gc_live_data_size_bytes 0.0
# HELP kafka_consumer_connection_creation_rate The number of new connections established per second
# TYPE kafka_consumer_connection_creation_rate gauge
kafka_consumer_connection_creation_rate{client_id="consumer-so66706766-1",} 0.07456936193482637
...

我添加了一个问题:https://github.com/reactor/reactor-kafka/issues/206

,

按照@gary-russell 的建议(再次感谢您的帮助!),我在构建侦听器时采用了略有不同的方法,以减少代码量,因为在我的项目中有很多消费者。

    class KafkaReceiverWithMetrics<K,V>(
        private val receiver: KafkaReceiver<K,V>,private val consumerId: String,private val metricsListener: MicrometerConsumerListener<K,) : KafkaReceiver<K,V> by receiver {
        override fun receive(): Flux<ReceiverRecord<K,V>> =
            receiver.receive()
                .doOnSubscribe {
                    receiver
                        .doOnConsumer { consumer -> metricsListener.consumerAdded(consumerId,consumer) }
                        .subscribe()
                } }

然后我只需要为每个侦听器提供一个 bean:

    @Bean
    fun someEventReceiver(): SomeEventReceiver =
        KafkaReceiverWithMetrics(
            KafkaReceiver.create(
                ReceiverOptions.create<String,SomeEvent>(kafkaProperties.buildConsumerProperties())
                    .withValueDeserializer(SomeEventDeserializer())
                    .subscription(listOf(topics.someEvent))
            ),topics.someEvent,MicrometerConsumerListener(meterRegistry)
        )