问题描述
我的消费者配置有 kafka 批量侦听器配置和 @KafkaListener 消费消息列表。我有一个 ConsumerInterceptor,我想为每条记录设置唯一的 id,并将其值存储在映射诊断上下文 (MDC) 中。如果我的 kafka 侦听器使用单个消息,则唯一 id 是正确的。但是我的 kafka 侦听器消耗消息列表,因此 MDC.get("id") 仅获取最后一个值。怎么处理? 我的拦截器;
public class KafkaConsumerInterceptor implements ConsumerInterceptor<String,String>{
@Override
public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> consumerRecords) {
ConsumerRecord<String,String> record = consumerRecords.iterator().next();
setId(record.headers().headers("id"));
return consumerRecords;
}
private void setId(Iterable<Header> idHeader) {
String id = UUID.randomUUID().toString();
if (idHeader.iterator().hasNext()) {
Header header = idHeader.iterator().next();
id = new String(header.value(),StandardCharsets.UTF_8);
}
MDC.put("id",id);
}
}
解决方法
您需要将 MDC.put()
移动到处理每条记录的任何位置,您不能为整个批次设置它。