pg-promise:对查询流中的行运行从属查询会耗尽内存

问题描述

在我的应用程序中,我需要为查询流中的每一行运行一个依赖更新(该查询流具有〜60k行),无论我尝试什么它都会耗尽内存,即使我期望流会请允许我保持较低的内存使用率。

在大量阅读和重新阅读了SO之后,pg-promise Wiki的各个页面,以及以不同的方式来实现这一点,我得出了以下内容(简化了我的代码):

try {
    await db.tx("test-tx",async tx => {
        const qs = new QueryStream(`SELECT s.a AS i FROM GENERATE_SERIES(1,100000) AS s(a)`);
        let count = 0;
        const startTime = new Date();
        const result = await tx.stream(qs,stream => {
            return pgp.spex.stream.read(
                stream,async (i,data) => {
                    // console.log(`handling ${i}: ${JSON.stringify(data)}`);
                    await innerQuery(tx,count++,startTime);
                },{ readChunks: true }
            );
        });
        console.log("stream done",result);
    });
    console.log("transaction done");
} catch (error) {
    console.error(error);
} finally {
    db.client.$pool.end();
}

async function innerQuery(tx,count,startTime) {
    if (count % 10000 === 0) {
        const duration = Math.round((new Date() - startTime) / 1000);
        const mb = Math.round(process.memoryUsage().heapUsed / 1024 / 1024);
        console.log(`row ${count},${mb}MB,${duration} seconds`);
    }
    await tx.one("SELECT 1");
    console.log(`inner query ${count} done`);
}

这将按预期运行查询,但是内存使用量却不断增加

row 0,12MB,0 seconds
row 10000,68MB,1 seconds
row 20000,124MB,2 seconds
row 30000,184MB,2 seconds
row 40000,241MB,3 seconds
row 50000,300MB,4 seconds
row 60000,357MB,5 seconds
row 70000,417MB,6 seconds
row 80000,476MB,6 seconds
row 90000,533MB,7 seconds
stream done { processed: 100000,duration: 8062 }
inner query 0 done
inner query 1 done
inner query 2 done
inner query 3 done
inner query 4 done
inner query 5 done
inner query 6 done
inner query 7 done
...
inner query 99999 done
transaction done

现在,这里出现了一些问题:请注意,tx.stream调用在所有内部查询解析之前返回,并输出到控制台。这解释了内存问题,所有这些闭包和承诺(其中的10万个)都以某种方式在内存中等待流完成,以便它们自己可以解决并被GC。

一个数据点:如果我从最高级别的db.tx更改为db.task,则在关闭连接之前仅运行一个或两个内部查询,进一步的查询会导致错误({ {1}}。

我也尝试过使用Querying against a released or lost connection.并使用tx.batch进行readChunks: false调用,但这只是在单批处理后暂停并锁定了。

那么我在做什么错了?我如何才能立即解决内部查询,以便GC可以逐步回收内存?

解决方法

据我所知,没有明显的方法可以减慢查询流以等待某些相关查询完成。新的内部查询的创建速度与结果的流式传输速度一样快,因此内存消耗达到了顶峰。

我找到了一个不使用 QueryStream 的解决方案。这使用了一个服务器端游标,意味着所有的查询都是串行运行的。尚未探索尝试并行运行这些块以提高吞吐量,但它确实解决了内存问题。

const startTime = new Date();
await db.tx("test-tx",async tx => {
    await tx.none(`
        DECLARE test_cursor CURSOR FOR
        SELECT s.a AS i FROM GENERATE_SERIES(1,100000) AS s(a)`);
    let row;
    while ((row = await tx.oneOrNone("FETCH NEXT FROM test_cursor"))) {
        await innerQuery(tx,row.i,startTime);
    }
    await tx.none("CLOSE test_cursor");
    console.log("outer query done");
});
console.log(`transaction done: ${memUsage()}MB,${duration(startTime)} seconds`);

这输出类似

row 10000: 9MB,5 seconds
inner query 10000 done
row 20000: 8MB,9 seconds
inner query 20000 done
row 30000: 10MB,14 seconds
inner query 30000 done
row 40000: 9MB,19 seconds
inner query 40000 done
row 50000: 11MB,23 seconds
inner query 50000 done
row 60000: 10MB,28 seconds
inner query 60000 done
row 70000: 8MB,33 seconds
inner query 70000 done
row 80000: 11MB,38 seconds
inner query 80000 done
row 90000: 9MB,43 seconds
inner query 90000 done
row 100000: 12MB,48 seconds
inner query 100000 done
outer query done
transaction done: 12MB,48 seconds