两个Quarkus服务之间的非阻塞数据流Java中带有Mutiny的Vert.x

问题描述

更新!

解决了一些与主要问题无关的问题之后,我修复了示例代码中的一些小错误,主要问题仍然是关于服务之间的非阻塞流传输。

背景信息

我要在Quarkus下移植Spring WebFlux服务。该服务对多个巨大的数据集进行长时间搜索,并在可用时以Flux(文本/事件流)返回部分结果。

问题

现在,我正在尝试在Quarkus下将Mutiny Multi与Vert.x结合使用,但无法弄清楚消费者服务如何在不阻塞的情况下接收此流。

在所有示例中,使用者要么是JS前端页面,要么生产者的内容类型是application / json,直到Multi完成后才在一个JSON对象中发送它(在我的应用程序中没有意义)。

问题

  1. 如何使用带有Mutiny风格的Vert.x WebClient接收文本/事件流?
  2. 如果问题在于WebClient无法接收连续的流:在两个Quarkus服务之间流传输数据的标准方法是什么?

这是一个简化的示例

测试实体

public class SearchResult implements Serializable {

    private String content;

    public SearchResult(String content) {
        this.content = content;
    }


    //.. toString,getters and setters
    
}

生产者1.简单的无限流->挂起

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2)              .onItem().transform(n -> new SearchResult(n.toString()));
}

生产者2,具有Vertx路径无限流->挂起

@Route(path = "/routed",methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
        log.info("routed run");
        return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onItem().transform(n -> new SearchResult(n.toString()));
}

生产者3.简单有限流->阻塞直到完成

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
        .transform().byTakingFirstItems(5)
        .onItem().transform(n -> new SearchResult(n.toString()));
}

消费者

在生产者和消费者方面都尝试了多种不同的解决方案,但是在每种情况下,流都会阻塞,直到流完成为止,或者无限期地挂起,而不会传输无限流的数据。我用httpie得到了相同的结果。这是最新的迭代:

WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx,webClientOptions);
        
client.get("/string")
                .send()
                .onFailure().invoke(resp -> log.error("error: " + resp))
                .onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
                .toMulti()
                .subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(),r.bodyAsstring())));

解决方法

Vert.x Web客户端不适用于SSE(无配置)。 来自https://vertx.io/docs/vertx-web-client/java/

响应已完全缓冲,请使用BodyCodec.pipe将响应传递到写入流

它等待直到响应完成。您可以使用 raw Vert.x HTTP客户端,也可以使用pipe编解码器。在https://vertx.io/docs/vertx-web-client/java/#_decoding_responses上给出了示例。

或者,您可以使用SSE客户端,例如: https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...