验证Kafka生产者消息传递

问题描述

此问题位于此处的讨论之上:How to verify sprng kafka producer has successfully sent message or not?。下面是我的代码,用于检查kafka生产者是否能够将记录发送到预期的主题。为了检查是否引发了异常,我放置了根本不存在的主题名称。

@RestController
public class TestController {

    @Autowired
    MailProcessor processor;
    
    private static final Logger logger = LoggerFactory.getLogger(TestController.class);
    
    @GetMapping(path = "/mailman/{command}")
    public void testApp(@PathVariable("command") String action) {
        
        try {
            
            Envelope message = new Envelope();
            message.setAction(action);
            message.setValue("this is the sample message for testing purpose only");
            
            processor.sendMessage("notAvailableTopic",message);
            
        } catch (Exception e) {
            logger.error("Exception in the test controller",e);
            
        }
    }
    
}

这是方法的实现

public void sendMessage(String topic,Envelope message) {
        
        try {
            
            ListenableFuture<SendResult<String,Envelope>> future = kafkaTemplate.send(topic,message);
            SendResult<String,Envelope> result = future.get(65000,TimeUnit.MILLISECONDS);
            
            logger.info("Successful delivery of {}",result.getProducerRecord());
            
        }catch(Exception ex) {
            logger.error("Exception while sending to {} topic",topic,ex);
        }
        
    }

kafkaTemplate的实例如下:

@Bean
    public List<String> consumerBootstrapServers(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
        return Arrays.asList(bootstrapServers.split(","));
    }

@Bean
    public ProducerFactory<String,Envelope> producerFactory(List<String> consumerBootstrapServers) {
        Map<String,Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,consumerBootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }


    @Bean
    public KafkaTemplate<String,Envelope> kafkaTemplate(ProducerFactory<String,Envelope> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

如前一篇文章所述; get()将花费60秒才能失败,我将调用线程阻塞了65秒。我可以在记录器语句下面看到。

2020-08-12 16:58:35.273  INFO 11471 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-12 16:58:35.273  INFO 11471 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-12 16:58:35.273  INFO 11471 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597269515273
2020-08-12 16:58:35.466  WARN 11471 --- [ad | producer-4] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-4] Error while fetching metadata with correlation id 2 : {notAvailableTopic=LEADER_NOT_AVAILABLE}
2020-08-12 16:58:35.467  INFO 11471 --- [ad | producer-4] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-4] Cluster ID: KQOZN8MkRVqke4J4H8PDpA
2020-08-12 16:58:35.879  INFO 11471 --- [nio-8080-exec-1] c.w.gioda.po.worker.KafkaProducer        : Successful delivery of ProducerRecord(topic=notAvailableTopic,partition=null,headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__,value = [99,111,109,46,119,97,108,114,116,98,115,103,105,100,112,101,69,110,118,101])],isReadOnly = true),key=null,value=Envelope [action=updateService,value=this is the sample message for testing purpose only],timestamp=null)
2020-08-12 17:00:17.984  INFO 11471 --- [uterTopic-0-C-1] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-postOfficeGrp-7,groupId=postOfficeGrp] Node 244026236 was unable to process the fetch request with (sessionId=1472063313,epoch=179): FETCH_SESSION_ID_NOT_FOUND.
2020-08-12 17:00:18.655  INFO 11471 --- [uterTopic-0-C-1] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-postOfficeGrp-7,groupId=postOfficeGrp] Node 1712770852 was unable to process the fetch request with (sessionId=1493387199,epoch=179): FETCH_SESSION_ID_NOT_FOUND.
2020-08-12 17:00:20.485  INFO 11471 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-postOfficeGrp-8,groupId=postOfficeGrp] Node 457669866 was unable to process the fetch request with (sessionId=1173363358,epoch=179): FETCH_SESSION_ID_NOT_FOUND.

它没有打印catch()块中的log语句。如何验证消息是否已成功发送到Kafka主题?我想念什么吗?

解决方法

请提供完整的测试用例。

我得到了预期的错误...

@SpringBootApplication
public class So63385353Application {

    public static void main(String[] args) {
        SpringApplication.run(So63385353Application.class,args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String,String> template) {
        return args -> {
            try {
                template.send("missing","foo").get(10,TimeUnit.SECONDS);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        };
    }

}
spring.kafka.producer.properties.max.block.ms=5000
2020-08-13 09:49:08.653 ERROR 14921 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='foo' to topic missing:

org.apache.kafka.common.errors.TimeoutException: Topic missing not present in metadata after 5000 ms.

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic missing not present in metadata after 5000 ms.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:573)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)
    at com.example.demo.So63385353Application.lambda$0(So63385353Application.java:22)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...