问题描述
我正在使用 express 和 busboy 实现上传端点。代码就像一个魅力。我正在尝试实现可恢复上传的逻辑。例如,如果用户正在上传 10gb 或 50gb 的文件,并且他们的互联网断开连接,并且下次他们上传相同的文件时,它应该恢复。
我明白我需要实现另一个端点,它应该告诉客户端到目前为止上传了多少字节,以便客户端可以发送剩余的字节。
我不知道如何从这里开始,因为我在这里面临的第一个问题是,当上传发生时,express 将临时文件上传到操作系统的 tmp 目录中。是否可以将临时文件上传到我当前的脚本目录中?
router.post("/upload",(req,res,next) => {
const busboy = new Busboy({ headers: req.headers });
req.pipe(busboy);
busboy.on("file",(fieldname,file,filename) => {
const filepath = path.join(__dirname,filename);
var writeStream = fs.createWriteStream(filepath);
file.pipe(writeStream);
writeStream.on("close",() => {
res.send(filename);
});
});
});
解决方法
回答我自己的问题。
我设法破解了解决方案。基本上,您需要从上次上传文件的位置继续上传文件。我不确定这是否是最好的处理方式,但它对我有用。
server.js
const express = require("express");
const busboy = require("connect-busboy");
const path = require("path");
const fs = require("fs");
const cors = require("cors");
const app = express(); // Initialize the express web server
app.use(cors());
app.use(
busboy({
highWaterMark: 2 * 1024 * 1024,// Set 2MiB buffer
})
); // Insert the busboy middle-ware
app.use(busboy()); // Insert the busboy middle-ware
const uploadPath = path.join(__dirname,"upload_data");
const database = {};
// This endpoint tells the client how much bytes have already been uploaded so far,otherwise sends 0 byte (meaning file is new)
app.route("/:id").get((req,res,next) => {
const fileId = req.params.id;
let bytes = 0;
const dbFileName = database[fileId];
if (dbFileName) {
try {
const completeFilePath = path.join(uploadPath,dbFileName);
const fd = fs.openSync(completeFilePath,"r");
const fileStat = fs.fstatSync(fd);
bytes = fileStat.size;
return res.json({ bytes: bytes });
} catch (error) {
console.error(error);
return res.json({ bytes: bytes });
}
}
return res.json({ bytes: bytes });
});
// Handle the upload post request
app.route("/upload").post((req,next) => {
const xFileId = req.headers["x-file-id"];
const xStartByte = parseInt(req.headers["x-start-byte"],10);
const xFileSize = parseInt(req.headers["x-file-size"],10);
if (xStartByte >= xFileSize) {
return res.json("File already uploaded");
}
req.pipe(req.busboy); // Pipe it trough busboy
req.on("data",(data) => {
// console.log(">",data.length);
});
req.busboy.on("file",(fieldname,file,filename) => {
if (database[xFileId]) {
filename = database[xFileId];
} else {
database[xFileId] = filename;
}
const completeFilePath = path.join(uploadPath,filename);
console.log(`Upload of '${filename}' started`);
// Create a write stream of the new file
let fstream;
if (xStartByte) {
console.log("APPEND Mode");
fstream = fs.createWriteStream(completeFilePath,{
flags: "a",});
} else {
console.log("WRITE Mode");
fstream = fs.createWriteStream(completeFilePath,{
flags: "w",});
}
// Pipe it trough
file.pipe(fstream);
file.on("error",(e) => console.log("file.on.error",e));
file.on("limit",(e) => console.log("Limit reached",e));
fstream.on("error",function (err) {
console.log("fileStream error>>>>>",err);
});
// On finish of the upload
fstream.on("close",() => {
console.log(`Upload of '${filename}' finished`);
// res.json('done');
});
});
req.busboy.on("finish",function (a) {
return res.json("ok");
});
req.busboy.on("error",(err) => {
console.log(`Busboy error`,err);
});
});
app.listen(6969,() => console.log("listing on 6969"));
client.js
var request = require("request");
var fs = require("fs");
var path = require("path");
let filebasename = "35gb.zip";
const filePath = path.join(__dirname,filebasename);
// Get the information about the file,like filesize and unique id of the file
function getFileInfo() {
try {
const fd = fs.openSync(filePath,"r");
const fileStat = fs.fstatSync(fd);
return {
fileId: `${filebasename}-${fileStat.size}-${fileStat.mtimeMs}`,size: fileStat.size,};
} catch (error) {
console.error(error);
}
}
const { fileId,size } = getFileInfo();
// Send api request to server asking how much bytes have already been uploaded (if any)
function info() {
let url = `http://localhost:6969/${fileId}`;
const options = {
method: "GET",url,timeout: 200000,headers: {
"Content-Type": "application/json",},};
return new Promise((resolve,reject) => {
request(options,function (err,body) {
if (err) {
console.log(err);
return reject(err);
}
const { bytes } = JSON.parse(body);
resolve(bytes);
});
});
}
// Send upload request
async function upload() {
const bytesAlreadyUploaded = await info();
let url = "http://localhost:6969/upload";
const uploadStream = fs.createReadStream(filePath,{
start: bytesAlreadyUploaded,// this will be 0 incase file is new
highWaterMark: 2 * 1024 * 1024,});
const options = {
method: "POST",headers: {
"Content-Type": "multipart/form-data","x-file-id": fileId,"x-start-byte": bytesAlreadyUploaded,"x-file-size": size,"x-file-name": filebasename,formData: {
image: uploadStream,};
request(options,async (err,body) => {
if (err) {
// Basically if an error occurs,EPIPE or Connection timed out or any other error,we will resume uploading from the point where it was last uploaded
console.log(`Error ${err.code}. Resuming upload...`);
await upload();
return;
}
console.log("body",typeof body,body);
});
}
(async () => {
upload();
})();