问题描述
请找到我们需要实现的用例。
首先,我们需要调用Kafka生产者一条消息作为休息服务,他们将处理并在另一个主题中返回响应。
对于我们来说,这是一个请求回复主题,我们需要使用 replykafka 模板正常工作,以便对同一请求进行回复,但是我们可以设置关联ID 。
作为主题消息元数据,存在发送属性,是否有任何方法可以将关联ID与请求主题消息和答复主题消息进行映射。
更好地向您解释。
一个微服务需要有效负载,如下所示,有效负载中带有 correlationId 。
{
"operationDate": "2020-09-16T11:58:25","correlationId": "-5544538377183901824042719876882142227","birthDate": "2013-12-12","firstNameEn": "boby","firstNameAr": "الشيخ",}
{
"correlationId": -5544538377183901824042719876882142227,"consumerId": null,"userid": 123456,"statusCode": "SUCCESS","errors": null
}
现在,我们需要使用Spring ReplyingKafkaTemplate来实现。
因为ReplyingKafkaTemplate仅与标头中的correlationId一起使用
解决方法
假设您要在相关ID中包含主题,请参见
/**
* Set a function to be called to establish a unique correlation key for each request
* record.
* @param correlationStrategy the function.
* @since 2.3
*/
public void setCorrelationIdStrategy(Function<ProducerRecord<K,V>,CorrelationKey> correlationStrategy) {
您可以基于ProducerRecord
(具有topic()
)创建自己的关联ID。
您只需要确保它是唯一的即可。如果您手动设置KafkaHeaders.REPLY_TOPIC
,则该策略将对它可见。
编辑
在有效负载中包含相关ID的情况下,使用setCorrelationIdStrategy
从有效负载中提取correlationId,并添加一个RecordInterceptor
在回复端执行相同的操作。
感谢您的提示。
我已经完成了用Kafka标头relatedId覆盖有效负载的操作。
Root
然后在Replay onMessage中,我将响应有效负载的correlationId填充到了标头中。
@Override
protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
if(producerRecord.value()!=null){
// i have appeneded the header correlationId in th payload
}
return super.doSend(producerRecord);
}
通过这种方式,在请求和响应有效负载中成功地将请求-响应语义与correlationId集成在一起。