为什么 Node 会阻塞流文件?

问题描述

我正在编写一个函数,通过流向客户端发送视频文件。它工作正常,除了在每个请求中它打开一个文件描述符但从不关闭它。这意味着这些文件以后无法删除,因为它们被 Node 进程阻塞了。

这是导致这种情况的函数

exports.stream = (req,res,next) => {
    const range = req.headers.range;
    const volumeName = req.query.volume;
    const folderPath = req.query.folder;
    const fileName = req.query.file;
    if (!range) return res.sendStatus(416);
    try {
        let volume = Services.volumes.get.byName(volumeName);
        let path = volume.location + folderPath + fileName;
        let pathExists = Services.files.get.exists(path);
        if (!pathExists) return res.status(404).send('unable to stream file,path not exists');
        
        let stats = fs.statSync(path);
        if (!stats.isFile()) {
            return res.status(400).send('requested path is not a file: ' + path);
        }
        const fileSize = stats.size;

        const parts = range.replace(/bytes=/,"").split("-");
        const start = parseInt(parts[0],10);
        let end = parts[1] ? parseInt(parts[1],10) : fileSize - 1;
        let chunksize = (end - start) + 1;
        const maxChunk = 1024 * 1024;
        if (chunksize > maxChunk) {
            end = start + maxChunk - 1;
            chunksize = (end - start) + 1;
        }

        res.writeHead(206,{
            'Content-Range': `bytes ${start}-${end}/${fileSize}`,'Accept-Ranges': 'bytes','Content-Length': chunksize,'Content-Type': 'video/mp4',});

        let stream = fs.createReadStream(path,{start: start,end: end,autoClose: true});
        stream.on('open',function () {
            stream.pipe(res);
        });
        stream.on('error',function (err) {
            res.end(err);
        });
        stream.on('close',function () {
            stream.destroy();
        });
    } catch (e) {
        next(new Error('unable to stream path: ' + e));
    }
};

解决方法

我找到了防止流保持打开状态的解决方案。

  • 一方面,我必须限制在每个请求中发送的信息量(又名chunksize)。
  • 另一方面,我使用了 pipeline 而不是 pipe。理论上两者是一样的,但实际上它们的工作方式不同。 This link 为我提供了一个起点。

代码如下(还是要说的好):

exports.stream = ({config}) => (req,res,next) => {
    let volumeName = req.query.volume;
    let folderPath = req.query.folder;
    let fileName = req.query.file;
    try {
        let volume = Services.volumes.get.byName(volumeName);
        let path = volume.location + folderPath + fileName;
        let exists = Services.files.get.exists(path);
        if (!exists) {
            return res.status(404).send('unable to stream file,path not exists');
        }

        let stats = fs.statSync(path);
        if (!stats.isFile()) {
            return res.status(400).send('requested path is not a file: ' + path);
        }

        const fileSize = stats.size;
        let maxchunksize = 1024 * 1024;
        let headers = {};
        let readable;

        if (req.headers['range']) {
            const range = req.headers.range;
            const parts = range.replace(/bytes=/,"").split("-");
            let start = parseInt(parts[0],10);
            let end = parts[1] ? parseInt(parts[1],10) : fileSize - 1;
            let chunksize = (end - start) + 1;
            if (start > end || start < 0 || end > fileSize - 1) {
                headers['Content-Range'] = '*/' + fileSize;
                res.writeHead(416,headers);
                return res.end();
            }
            if (chunksize > maxchunksize) {
                end = start + maxchunksize - 1;
                chunksize = (end - start) + 1;
            }
            headers['Accept-Ranges'] = 'bytes';
            headers['Content-Type'] = 'video/mp4';
            headers['Content-Range'] = 'bytes ' + start + '-' + end + '/' + fileSize;
            headers['Content-Length'] = chunksize;
            res.writeHead(206,headers);
            readable = fs.createReadStream(path,{start: start,end: end,autoClose: true});

        } else {
            headers['Content-Length'] = fileSize;
            headers['Content-Type'] = 'video/mp4';
            res.writeHead(200,headers);
            readable = fs.createReadStream(path);
        }

        stream.pipeline(readable,err => {
            if (err) console.log(err);
        });
    } catch (e) {
        next(new Error('unable to stream path: ' + e));
    }
};