问题描述
使用spring Kafka制作产品并收听请求,键值位于“”内
生产者代码
2502.699307245550715093058647
监听器
public record ProductProducer(ReplyingKafkaTemplate<String,Object,Object> _replyTemplate,) implements IProductProducer {
public Productviewmodel Update(Productviewmodel product,String id) throws InterruptedException,ExecutionException,TimeoutException {
RequestReplyFuture<String,Object> future =
this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.UPDATE_PRODUCT,id,product));
LOG.info(future.getSendFuture().get(kafkaConstants.kafkaTimeout,TimeUnit.SECONDS).getRecordMetadata().toString());
Product productDb = (Product) future.get(kafkaConstants.kafkaTimeout,TimeUnit.SECONDS).value();
return new Productviewmodel();
}}
键值位于“”中,如下所示
消费工厂
@KafkaListener(id = ProductTopicConstants.UPDATE_PRODUCT,topics = ProductTopicConstants.UPDATE_PRODUCT,containerFactory = "addUpdateProductContainerFactory")
@SendTo
public Object UpdateProduct(ConsumerRecord<String,Productviewmodel> productviewmodel) {
String id = productviewmodel.key();
logger.info("Listening to update product with id :",id);
return new Product();
}
我具有以下配置
@Bean
public ConsumerFactory<String,String> consumerFactoryGetDeleteProduct() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(String.class));
}
解决方法
我的序列化和反序列化工作是在将JSON解析器更改为字符串来完成工作
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer