如何在由KafkaListenerContainerFactory创建的KafkaListener类中添加千分尺计时器?

问题描述

当我尝试自动将MicrometerRegistry连接到包含KafkaListener的类时,出现以下错误-

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaConsumer' defined in file [/apps/flux-mariadb-pipeline/build/classes/java/main/com/processor/consumer/KafkaConsumer.class]: Bean instantiation via constructor Failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.processor.consumer.KafkaConsumer]: Constructor threw exception; nested exception is java.lang.NullPointerException
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:313)
    at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:294)
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.autowireConstructor(AbstractAutowireCapablebeanfactory.java:1358)
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.createBeanInstance(AbstractAutowireCapablebeanfactory.java:1204)
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.doCreateBean(AbstractAutowireCapablebeanfactory.java:557)
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.createBean(AbstractAutowireCapablebeanfactory.java:517)
    at org.springframework.beans.factory.support.Abstractbeanfactory.lambda$doGetBean$0(Abstractbeanfactory.java:323)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:226)
    at org.springframework.beans.factory.support.Abstractbeanfactory.doGetBean(Abstractbeanfactory.java:321)
    at org.springframework.beans.factory.support.Abstractbeanfactory.getBean(Abstractbeanfactory.java:202)
    at org.springframework.beans.factory.support.DefaultListablebeanfactory.preInstantiateSingletons(DefaultListablebeanfactory.java:893)
    at org.springframework.context.support.AbstractApplicationContext.finishbeanfactoryInitialization(AbstractApplicationContext.java:879)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at com.processor.App.main(App.java:10)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.processor.consumer.KafkaConsumer]: Constructor threw exception; nested exception is java.lang.NullPointerException
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:217)
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:117)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:309)
    ... 20 common frames omitted
Caused by: java.lang.NullPointerException: null
    at java.util.Objects.requireNonNull(Objects.java:203)
    at io.micrometer.core.instrument.ImmutableTag.<init>(ImmutableTag.java:35)
    at io.micrometer.core.instrument.Tag.of(Tag.java:29)
    at io.micrometer.core.instrument.Tags.and(Tags.java:74)
    at io.micrometer.core.instrument.Timer$Builder.tag(Timer.java:364)
    at com.processor.consumer.KafkaConsumer.<init>(KafkaConsumer.java:47)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:204)
    ... 22 common frames omitted

KafkaConsumer类-

public KafkaConsumer(MeterRegistry meterRegistry) {
  eventTimer =
      Timer.builder("travel.time")
          .description("The time it takes for the event to travel.")
          .tag("topic",topic)
          .publishpercentiles(.30,.65,.99)
          .publishpercentileHistogram()
          .minimumExpectedValue(Duration.ofMillis(1))
          .maximumExpectedValue(Duration.ofMillis(5000))
          .register(meterRegistry);
}

@KafkaListener(
    topics = "${spring.kafka.consumer.properties.topic}",groupId = "${spring.kafka.consumer.group-id}")
public void consume(
    ConsumerRecord<String,DataRecord> record,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,@Header(KafkaHeaders.OFFSET) Long offset,AckNowledgment ack)
    throws IOException {
  .
  .
  }

我正在使用如下工厂创建KafkaListener容器-

@Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>
      kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String,String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(8);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(getErrorHandler());

    return factory;
  }

我发现添加千分尺注册表的唯一方式如下

@Bean public ConsumerFactory<String,String> consumerFactory() {
    Map<String,Object> props = kafkaProperties.buildConsumerProperties();

    DefaultKafkaConsumerFactory<String,String> cf = new DefaultKafkaConsumerFactory<>(props);

    cf.addListener(new MicrometerConsumerListener<>(meterRegistry(),Collections.singletonList(new ImmutableTag("customTag","customTagValue"))));

    return cf;
  }

现在,我不知道如何在@KafkaListener方法内访问此注册

解决方法

在io.micrometer.core.instrument.ImmutableTag。(ImmutableTag.java:35)

.tag("topic",topic)

topic变量是null

如果它带有@Value批注,则不能在构造函数中使用它;而是使用@PostConstruct方法创建仪表。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...