问题描述
在 JHipster 版本 6.6.0 中,Kafka 使用模型已从标准的 Producer/Consumer 类更改为 WebResource 级别。没有真实的例子,这种变化有什么好处,以及这种变化可以如何在实际应用中使用。
假设我们有服务 A 和服务 B。这两个服务之间的通信必须通过 Kafka 事件来完成。
问题是 - 我必须做什么服务 B 开始监听来自 服务 A 主题的事件。在当前配置中,看起来我必须手动触发 /consumes
端点,但这没有意义,因为我期望在应用程序启动并运行后该服务将开始侦听指定的主题列表.
如果您对此主题有任何评论,以帮助我理解这一点,我将不胜感激。
示例: jhipster 7.1.0 生成此资源:
服务 A - 网关
package com.stukans.refirmware.gateway.web.rest;
import com.stukans.refirmware.gateway.config.KafkaProperties;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@RestController
@RequestMapping("/api/gateway-kafka")
public class GatewayKafkaResource {
private final Logger log = LoggerFactory.getLogger(GatewayKafkaResource.class);
private final KafkaProperties kafkaProperties;
private KafkaSender<String,String> sender;
public GatewayKafkaResource(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
this.sender = KafkaSender.create(SenderOptions.create(kafkaProperties.getProducerProps()));
}
@PostMapping("/publish/{topic}")
public Mono<PublishResult> publish(
@PathVariable String topic,@RequestParam String message,@RequestParam(required = false) String key
) {
log.debug("REST request to send to Kafka topic {} with key {} the message : {}",topic,key,message);
return Mono
.just(SenderRecord.create(topic,null,message,null))
.as(sender::send)
.next()
.map(SenderResult::recordMetadata)
.map(
Metadata ->
new PublishResult(Metadata.topic(),Metadata.partition(),Metadata.offset(),Instant.ofEpochMilli(Metadata.timestamp()))
);
}
@GetMapping("/consume")
public Flux<String> consume(@RequestParam("topic") List<String> topics,@RequestParam Map<String,String> consumerParams) {
log.debug("REST request to consume records from Kafka topics {}",topics);
Map<String,Object> consumerProps = kafkaProperties.getConsumerProps();
consumerProps.putAll(consumerParams);
consumerProps.remove("topic");
ReceiverOptions<String,String> receiverOptions = ReceiverOptions.<String,String>create(consumerProps).subscription(topics);
return KafkaReceiver.create(receiverOptions).receive().map(ConsumerRecord::value);
}
private static class PublishResult {
public final String topic;
public final int partition;
public final long offset;
public final Instant timestamp;
private PublishResult(String topic,int partition,long offset,Instant timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
}
}
}
服务 B - 代理
package com.stukans.refirmware.agent.web.rest;
import com.stukans.refirmware.agent.config.KafkaProperties;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@RestController
@RequestMapping("/api/agent-kafka")
public class AgentKafkaResource {
private final Logger log = LoggerFactory.getLogger(AgentKafkaResource.class);
private final KafkaProperties kafkaProperties;
private KafkaSender<String,String> sender;
public AgentKafkaResource(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
this.sender = KafkaSender.create(SenderOptions.create(kafkaProperties.getProducerProps()));
}
@PostMapping("/publish/{topic}")
public Mono<PublishResult> publish(
@PathVariable String topic,Instant timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
}
}
}
这是唯一可用的与 Kafka 相关的代码。
在 6.6.0 版本之前,JHipster 生成了标准的生产者/消费者类,我可以用它来定义要收听的主题。现在还不清楚如何使用生成的代码来发出/监听事件。
解决方法
首先,由于我们看不到您的代码,我们不知道它是如何(或应该)如何工作的...
在任何情况下,您都不应该通过 REST“触发”消费者;它们应该在服务启动时自动启动。如果没有消息,那么他们在后台轮询并空闲,直到“生产者服务”向主题推送消息。