问题描述
我正在使用发送到我的 DefaultKafkaConsumerFactory 上的 ErrorHandlingDeserialiser 处理反序列化错误。
try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(),consumerKeyDeserializer,errorHandlingDeserializer);
}
当发生反序列化错误时,我想记录主题、分区和偏移量。我能想到的唯一方法是停止在函数中返回 null 并返回 MyEvent 的新子类型。我的 KafkaListener 然后可以询问新的子类型。
我有一个@KafkaListener 组件,它监听 ConsumerRecord 如下:
@KafkaListner(....)
public void onMessage(ConsumerRecord<String,MyEvent> record) {
...
...
// if record.value instance of MynewsubType
// I have access to the topic,partition and offset here,so I Could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a Failed record.
}
这是这样做的方式吗?我知道 null 对 Kafka 有特殊意义。
这种子类型方法的缺点是,我必须使用 ErrorHandlingDeserialiser 为每种类型创建一个子类型。
解决方法
不要使用函数;相反,抛出的 DeserializationException
直接传递给容器的 ErrorHandler
。
SeekToCurrentErrorHandler
认为这些异常是致命的并且不会重试它们,它将记录传递给恢复器。
有一个提供的 DeadLetterPublishingRecoverer
发送记录。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
和
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters