将 JSON 从 REST API 流式传输到 Express 应用程序的最佳方法是什么?

问题描述

我有一个 moleculer-based 微服务,它有一个端点可以输出一个大型 JSON 对象(大约数万个对象)

这是一个结构化的 JSON 对象,我事先知道它会是什么样子。

[ // ... tens of thousands of these
  {
    "fileSize": 1155624,"name": "Gyo v1-001.jpg","path": "./userdata/expanded/Gyo v01 (2003)"
  },{
    "fileSize": 308145,"name": "Gyo v1-002.jpg","path": "./userdata/expanded/Gyo v01 (2003) (Digital)"
  }
  // ... tens of thousands of these
]

我开始研究 JSON 流,并在那里取得了一些进展,因为我知道如何使用 NodeJS ReadableStream 客户端。我知道我可以使用 oboe 来解析 JSON 流。

为此,这是我基于 Express 的应用程序中的代码


router.route("/getComicCovers").post(async (req: Request,res: Response) => {
  typeof req.body.extractionoptions === "object"
    ? req.body.extractionoptions
    : {};
  oboe({
    url: "http://localhost:3000/api/import/getComicCovers",method: "POST",body: {
      extractionoptions: req.body.extractionoptions,walkedFolders: req.body.walkedFolders,},}).on("node",".*",(data) => {
    console.log(data);
    res.write(JSON.stringify(data));
  });
});

这是moleculer中的端点

getComicCovers: {
    rest: "POST /getComicCovers",params: {
        extractionoptions: "object",walkedFolders: "array",async handler(
        ctx: Context < {
            extractionoptions: IExtractionoptions;
            walkedFolders: IFolderData[];
        } >
    ) {
        
        const comicBooksForImport = await getCovers(
            ctx.params.extractionoptions,ctx.params.walkedFolders
        );

// comicBooksForImport is the aforementioned array of objects.
// How do I stream it from here to the Express app object-by-object?

        
    },

我的问题是:如何将这个巨大的 JSON 从 REST 端点流式传输到 Express 应用程序,以便我可以在客户端解析它?

更新

根据@JuanCaicedo 的建议,我采用了 socket.io 实现。我在服务器端和客户端都设置了它。

但是,这段代码确实有问题

map(
    walkedFolders,async (folder,idx) => {
        let foo = await extractArchive(
            extractionoptions,folder
        );

        let fo =
            new JsonStreamStringify({
                foo,});

        fo.pipe(res);
        if (
            +idx ===
            walkedFolders.length - 1
        ) {
            res.end();
        }
    }
);

我收到一个 Error [ERR_STREAM_WRITE_AFTER_END]: write after end 错误。我知道发生这种情况是因为响应在下一次迭代尝试将 foo(它是一个流)的更新值通过管道传输到响应之前终止。

我该如何解决这个问题?

解决方法

您是寻求一般方法建议,还是寻求对您拥有的特定解决方案的支持?

如果是第一个,那么我认为在服务器和客户端之间进行通信的最佳选择是通过 websockets,也许是像 Socket.io 这样的东西。长期连接将在这里为您提供良好的服务,因为传输您的所有数据需要很长时间。

然后您可以随时将数据从服务器发送到客户端。此时,您可以将服务器上的数据作为 node.js 流读取并一次发送一个数据。

使用双簧管并在每个节点上写入响应的问题在于它需要长时间运行的响应,并且在您发送所有数据之前连接很可能会中断。