如何在kafka中捕获和处理DeserializationException

问题描述

@KafkaListener(topics = "${topics.input}")
public void listener(JsonObj obj) {
    //...
}

如果传入错误的json,则抛出异常。我需要处理这个异常。主要任务:保存抛出异常的消息的键我猜我需要创建一个自定义处理程序实现并替换

ErrorHandlingDeserializer()

在我的配置中:

@Bean
public ConsumerFactory<String,JsonObj> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
            consumerConfigs(),new StringDeserializer(),new ErrorHandlingDeserializer(new JsonDeserializer<>(JsonObj.class)));
}

如果是这样,我不知道如何实现这个处理程序。它应该实现什么接口?或者也许有另一种解决方案? 问题:如何捕捉和处理这个异常?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)