问题描述
我正在使用Micronaut应用程序通过apache Kafka接收和返回反应类型。来自Micronaut文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/
下面有一种接收和返回反应类型的方法
此方法位于使用者(侦听器端)
@Topic("reactive-products")
public Single<Product> receive(
@KafkaKey String brand,Single<Product> productFlowable) {
return productFlowable.doOnSuccess((product) ->
System.out.println("Got Product - " + product.getName() + " by " + brand)
);
}
是否有使用此方法的示例,生产者端将如何处理返回值?
解决方法
在此示例中,当返回响应式类型时,Micronaut 框架会创建对 Single<Product>
的订阅。
您不会编写任何代码来“使用”上述方法,因为这是由框架处理的。上面的代码是一个 kafka 消费者实现,当从配置的 Product
的 kafka 代理获取消费者记录时,将收到 @Topic
实例。
关于生成消息,您可以参考这里:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaClient。