使用Apache Kafka Java和Micronaut应用程序进行Rest API设计

问题描述

enter image description here

在使用Micronaut框架进行Rest API CRUD操作时,我具有上面的图。我有一个单向流,控制器需要从Kafka使用者API中知道操作的执行情况。

例如

  1. 要从数据库获取所有商品列表,请在消费者级别执行
  2. 在消费者级别添加/更新/删除操作

我在消费者级别(监听器)具有下面的Micronaut反应代码,以便从mongo数据库获取产品列表

@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product> findByFreeText(String text) {
    LOG.info(String.format("Listener --> Listening value = %s",text));
    return Flowable.frompublisher(repository.getCollection("product",Product.class)
            .find(new Document("$text",new Document("$search",text)
                            .append("$caseSensitive",false)
                            .append("$diacriticSensitive",false)
            )));
}

生产者界面

@KafkaClient
public interface IProductProducer {
    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
        Flowable<Product> findFreeText(String text);
}

生产者的实现

@Override
    public Flowable<Productviewmodel> findFreeText(String text) {
        LOG.info("Manager --> Finding all the products");
        List<Productviewmodel> model = new ArrayList<>();
        iProductProducer.findFreeText(text).subscribe(item -> {
            System.out.println(item);
        },error ->{
            System.out.println(error);
        });
        return Flowable.just(new Productviewmodel());
    }

根据Micronaut文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerMethods

根据Micronaut文档接收和返回反应类型

@Topic("reactive-products")
public Single<Product> receive(
        @KafkaKey String brand,Single<Product> productFlowable) { 
    return productFlowable.doOnSuccess((product) ->
            System.out.println("Got Product - " + product.getName() + " by " + brand) 
    );
}

在生产者实现中,订阅时我从未收到任何值。

iProductProducer.findFreeText(text).subscribe(item -> {
                System.out.println(item);
            },error ->{
                System.out.println(error);
            });

我想知道以上流程是否是使用kafka执行REST API CRUD操作的正确方法

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)