问题描述
我正在尝试使用Reactor Netty连接到在Docker容器上运行的消息队列。我是独立运行的,因为存在依赖性问题,所以不使用SpringFlux。
从Reactor Netty文档中的示例中,我看到了一种连接到服务器并获得响应的方法:
public static void main(String[] args) {
String response =
HttpClient.create()
.headers(h -> h.add("my header",my_header)
.get()
.uri(my_uri)
.responseContent()
.aggregate()
.asstring()
.block();
}
但是当我以后尝试通过System.out.println()显示输出时,什么也没发生。
我还试图了解如何使用:
Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)
但是我不确定该怎么做。 我在文档中看到一个名为Connection的类,该类使用TCPClient并具有方法subscription。
我有点迷茫,您能否指出我在不使用spring-flux的情况下在Reactor Netty中实现此目标的正确方向?
谢谢
编辑:
经过一些实验,我得到了:
private disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header",my_header)
.get()
.uri(my_uri)
.response((res,bytes) - > {
System.out.println(bytes.asstring());
return bytes.asstring();})
.subscribe();
}
这给了我一个FluxHandle,如何使用它实际读取响应的正文?
解决方法
因此,我想出了如何使用jackson
库订阅和读取从服务器接收到的数据,甚至将数据转换为JSON的方法,以方便我的代码读取。
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header",my_header)
.get()
.uri(my_uri)
.response((resp,bytes) -> {
return bytes.asString();
})
.subscribe(response -> {
try {
consumeData(new ObjectMapper()
.readValue(response,MyData.class));
} catch (IOException ex) {
System.out.println("ERROR converting to json: " + ex);
}
});
}
看来,当使用subscribe()方法时,我可以监听传入的响应并对其进行处理。我仍然需要添加一种在服务器停止或消息队列关闭时关闭连接的方法,以使客户端不会挂在不存在的消息队列上。