使用 Reactor Netty 客户端一次读取所有大型 JSON 有效负载,而不是分块读取,以保持长期持续连接

问题描述

为了描述我正在尝试解决/实现的用例,我们与第 3 方服务集成,该服务通过 HTTP/1.1 长寿命/持久连接(可以保持最多打开 2-3 小时,这是另一个问题,关于连接池在这种情况下的行为方式,因为所有通过 HTTP 客户端的连接都只到该主机)。但是我目前遇到的问题是,我连接并订阅了如下所示的数据

 client
            .get()
            .uri("/")
            .responseContent()
            .asstring()
             // .aggregate() // 1
            .subscribe(content -> {
                  logger.info("Running content on thread {}: {}",Thread.currentThread().getName(),content);
            });

对于小 JSON,它工作正常,对于大 JSON,content 包含完整 JSON 的一部分,因此它本身不是有效的 JSON 或完整数据。无论一个块的大小如何,是否都可以始终获取整个数据(有没有办法设置大 ByteBuf 大小以将任何 JSON 有效负载放入一个响应中?)或者如果没有,您如何等待和将同一 JSON 响应的多个部分组合成完整的有效 JSON?

举个例子,我的意思是(我在例子中减少了 JSON 负载的大小,但只是为了演示这个想法),说服务器发送以下 JSON:

{"id":1,data: [1,2,3]}

在客户端,我以 3 个块获得数据/响应,即

  • 块 1:{"id:1",dat
  • 块 2:a: [1,2
  • 块 3:,3]}

(然后在块 3 之后,我在日志中看到 READ COMPLETE,见下文)。

如果我启用 aggregate(),则根本不会打印任何内容,据我所知,它会等待连接关闭,但由于它是一个持久的长期连接,因此这将不起作用。

有趣的部分是,如果在客户端启用 .wiretap(true),当大型 JSON 被拆分为多个 READ COMPLETE-s 时,它会在日志中打印 ByteBuf,只有当完整的 JSON 内容被消耗(不是在每个单独的 ByteBuf 部分之后),这可能意味着客户端知道来自服务器的单个数据响应的处理何时结束。

我发现有 .httpResponseDecoder,因为看起来客户端将块限制为 16kb,虽然我将其扩展到 2mb,即 .httpResponseDecoder(spec -> spec.maxChunkSize(2 * 1024 * 1024)),但仍然没有将整个 JSON 作为单个数据块,仍然看到每个块最大 16kb 的限制。

知道如何实现这一目标或在哪里查看吗?

解决方法

在 Gitter https://gitter.im/reactor/reactor-netty 中,Violeta Georgieva 建议尝试使用 JsonObjectDecoder,向客户端添加以下内容:

client.doOnConnected(connection -> connection.addHandler(new JsonObjectDecoder()))

现在在每个订阅处理中获得正确的 JSON。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...