问题描述
我有一个脚本,该脚本将从服务器下载数千个文件,对这些文件执行一些cpu密集型计算,然后将结果上载到某个地方。为了增加复杂性,我想限制到我要下载文件的服务器的并发连接数。
为了从事件线程中获取cpu密集型计算,我利用了workerpool by josdejong。我还认为我可以利用这样一个事实,即在任何给定时间仅会旋转有限数量的线程来限制与服务器的并发连接数,因此我尝试将网络I / O置于工作进程中,例如所以(TypeScript):
import Axios from "axios";
import workerpool from "workerpool";
const pool = workerpool.pool({
minWorkers: "max",});
async function processData(file: string) {
console.log("Downloading " + file);
const csv = await Axios.request<IncomingMessage>({
method: "GET",url: file,responseType: "stream"
});
console.log(csv);
// Todo: Will process the file here
}
export default async function (files: string[]) {
const promiseArray: workerpool.Promise<Promise<void>>[] = [];
// Only processing the first file for Now during testing
files.slice(0,1).forEach((file) => {
promiseArray.push(pool.exec(processData,[file]));
});
await Promise.allSettled(promiseArray);
await pool.terminate();
}
当我编译并运行此代码时,看到消息“正在下载test.txt”,但是此后,我看不到以下日志语句(console.log(csv)
)
我已经尝试过对此代码进行各种修改,包括删除responseType
,删除await
以及仅检查Axios返回的Promise
,使函数不异步等。无论似乎总是在Axios.request
行上崩溃
工作线程是否无法打开HTTP连接?还是我只是犯了一个愚蠢的错误?
解决方法
如果没有到达这一行代码:
console.log(csv);
然后,要么Axios.request()
从未履行其承诺,要么该承诺正在拒绝。这些功能中的任何一个都没有错误处理,因此,如果它拒绝了,您将不会知道也不会在记录问题。首先,我建议您对代码进行检测,以便记录任何拒绝:
async function processData(file: string) {
try {
console.log("Downloading " + file);
const csv = await Axios.request<IncomingMessage>({
method: "GET",url: file,responseType: "stream"
});
console.log(csv);
} catch(e) {
console.log(e); // log an error
throw e; // propagate rejection/error
}
}
作为代码设计的一般要点,您应该在某种程度上捕获并记录任何可能的promise拒绝。您不必在最低的调用级别上全部捕获它们,因为它们会通过返回的Promise传播出去,但是您确实需要捕获任何可能的拒绝,并且出于自己的理智,您需要对其进行记录,以便您可以看看它何时发生以及错误是什么。
,您不能在辅助线程中执行TypeScript。 pool.exec
方法接受静态JavaScript函数或具有相同函数的JavaScript文件的路径。
这里是workerpool readme的引文:
请注意,函数和参数都必须是静态且可字符串化的,因为它们需要以序列化形式发送给工作程序。对于大型函数或函数参数,将数据发送给工作程序的开销可能很大。
我正在尝试使用TypeScript进行这项工作。解决此问题的可能方法是:
- 在TypeScript中编写一个辅助函数,使用任何捆绑程序将其编译为单独的捆绑包,然后将已编译文件的路径传递给
pool.exec
。我设法完成了这项工作,但唯一令我不满意的是,使用此解决方案,您将无法使用nodemon(如果使用) - 使用JS包装器来编译TS源代码,并使用ts-node执行它。然后,将该包装的路径传递给
pool.exec
函数。此解决方案不适用于捆绑器