在Apache Kafka Java

问题描述

请找到我们需要实现的用例。

首先,我们需要调用Kafka生产者一条消息作为休息服务,他们将处理并在另一个主题中返回响应。

对于我们来说,这是一个请求回复主题,我们需要使用 replykafka 模板正常工作,以便对同一请求进行回复,但是我们可以设置关联ID

作为主题消息元数据,存在发送属性,是否有任何方法可以将关联ID与请求主题消息和答复主题消息进行映射。

更好地向您解释。

一个微服务需要有效负载,如下所示,有效负载中带有 correlationId

{
  "operationDate": "2020-09-16T11:58:25","correlationId": "-5544538377183901824042719876882142227","birthDate": "2013-12-12","firstNameEn": "boby","firstNameAr": "الشيخ",}

微服务将处理有效负载并在另一个主题中给出响应。

{
  "correlationId": -5544538377183901824042719876882142227,"consumerId": null,"userid": 123456,"statusCode": "SUCCESS","errors": null
}

现在,我们需要使用Spring ReplyingKafkaTemplate来实现。

因为ReplyingKafkaTemplate仅与标头中的correlationId一起使用

解决方法

假设您要在相关ID中包含主题,请参见

/**
 * Set a function to be called to establish a unique correlation key for each request
 * record.
 * @param correlationStrategy the function.
 * @since 2.3
 */
public void setCorrelationIdStrategy(Function<ProducerRecord<K,V>,CorrelationKey> correlationStrategy) {

您可以基于ProducerRecord(具有topic())创建自己的关联ID。

您只需要确保它是唯一的即可。如果您手动设置KafkaHeaders.REPLY_TOPIC,则该策略将对它可见。

编辑

在有效负载中包含相关ID的情况下,使用setCorrelationIdStrategy从有效负载中提取correlationId,并添加一个RecordInterceptor在回复端执行相同的操作。

,

感谢您的提示。

我已经完成了用Kafka标头relatedId覆盖有效负载的操作。

Root

然后在Replay onMessage中,我将响应有效负载的correlationId填充到了标头中。

@Override
    protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
        if(producerRecord.value()!=null){
           // i have appeneded the header correlationId in th payload
        }
        return super.doSend(producerRecord);
    }

通过这种方式,在请求和响应有效负载中成功地将请求-响应语义与correlationId集成在一起。