问题描述
我通过 Kafka 收到一条消息,我知道其中包含非 UTC 时区。
当我使用 org.apache.kafka.common.serialization.StringDeserializer
来验证这一点时,我得到了带有时区的 ISO 8601 格式的正确时间戳:
{ "id": "e499f2e8-a50e-4ff8-a9fe-0eaf9d3314bf","sent_ts": "2021-02-04T14:06:10+01:00" }
当我切换到 org.springframework.kafka.support.serializer.JsonDeserializer
时,这会丢失。我的 POJO 看起来像这样:
public class MyMessage {
@JsonProperty("id")
private String id;
@JsonProperty("sent_ts")
private OffsetDateTime sentTs;
@Override
public String toString() {
return "MyMessage{" +
"id='" + id + '\'' +
",sentTs=" + sentTs +
'}';
}
当我记录收到的消息时,我得到:
MyMessage{id='e499f2e8-a50e-4ff8-a9fe-0eaf9d3314bf',sentTs=2021-02-04T13:06:10Z}
我认为 JsonDeserializer
必须使用 Jackson 所以在我设置的 application.yml
配置中:
spring.jackson:
deserialization.ADJUST_DATES_TO_CONTEXT_TIME_ZONE: false
这没有用。我也试过定制器:
@Configuration
public class ObjectMapperBuilderCustomizer implements Jackson2ObjectMapperBuilderCustomizer {
@Override
public void customize(Jackson2ObjectMapperBuilder builder) {
builder.modules(new JavaTimeModule());
builder.featuresTodisable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
}
}
这也不起作用。
我虽然可能需要成为 Kafka 消费者的属性,所以我也尝试过:
spring:
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.jackson.deserialization.ADJUST_DATES_TO_CONTEXT_TIME_ZONE: false
还是不行。
有没有办法让 JsonDeserializer
正常工作并保持正确的时区偏移?
解决方法
当你喜欢这个 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
时,该类的实例是由 Apache Kafka 客户端代码创建的,它完全不知道 Spring 配置。
如果您想依赖 Spring Boot 配置的 ObjectMapper
和您的自定义,您应该考虑执行以下操作:
@Bean
DefaultKafkaConsumerFactory kafkaConsumerFactory(KafkaProperties properties,ObjectMapper objectMapper) {
Map<String,Object> consumerProperties = properties.buildConsumerProperties();
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
jsonDeserializer.configure(consumerProperties,false);
return new DefaultKafkaConsumerFactory(consumerProperties,new StringDeserializer(),jsonDeserializer);
}
注意我如何称呼jsonDeserializer.configure(consumerProperties,false);
。这样,您仍然可以在 applicaiton.yml
中为 Kafka 消费者配置其余属性。
请考虑为 Spring Boot 提出 GH 问题,因此我们将修改我们如何处理 JsonDeserializer
和自动配置的 ObjectMapper
,以提供更好的最终用户体验。