接收和返回带有micronaut 2.1.3的反应型apache kafka

问题描述

我正在使用Micronaut应用程序通过apache Kafka接收和返回反应类型。来自Micronaut文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/

下面有一种接收和返回反应类型的方法

方法位于使用者(侦听器端)

@Topic("reactive-products")
public Single<Product> receive(
        @KafkaKey String brand,Single<Product> productFlowable) { 
    return productFlowable.doOnSuccess((product) ->
            System.out.println("Got Product - " + product.getName() + " by " + brand) 
    );
}

是否有使用此方法的示例,生产者端将如何处理返回值?

解决方法

在此示例中,当返回响应式类型时,Micronaut 框架会创建对 Single<Product> 的订阅。

您不会编写任何代码来“使用”上述方法,因为这是由框架处理的。上面的代码是一个 kafka 消费者实现,当从配置的 Product 的 kafka 代理获取消费者记录时,将收到 @Topic 实例。

关于生成消息,您可以参考这里:https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaClient