浅谈Node.js之异步流控制

前言

在没有深度使用函数回调的经验的时候,去看这些内容还是有一点吃力的。由于Node.js独特的异步特性,才出现了“回调地狱”的问题,这篇文章中,我比较详细的记录了如何解决异步流问题。

文章会很长,而且这篇是对异步流模式的解释。文中会使用一个简单的网络蜘蛛的例子,它的作用是抓取指定URL的网页内容并保存在项目中,在文章的最后,可以找到整篇文章中的源码demo。

1.原生JavaScript模式

本篇不针对初学者,因此会省略掉大部分的基础内容的讲解:

(spider_v1.js)

function spider(url,callback) {
const filename = utilities.urlToFilename(url);
console.log(filename: ${filename});

fs.exists(filename,exists => {
if (!exists) {
console.log(Downloading ${url});

  request(url,(err,response,body) => {
    if (err) {
      callback(err);
    } else {
      mkdirp(path.dirname(filename),err => {
        if (err) {
          callback(err);
        } else {
          fs.writeFile(filename,body,err => {
            if (err) {
              callback(err);
            } else {
              callback(null,filename,true);
            }
          });
        }
      });
    }
  });
} else {
  callback(null,false);
}

});
}

spider(process.argv[2],downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(Completed the download of ${filename});
} else {
console.log(${filename} was already downloaded);
}
});

上边的代码的流程大概是这样的:

  1. 把url转换成filename
  2. 判断该文件名是否存在,若存在直接返回,否则进入下一步
  3. 发请求,获取body
  4. 把body写入到文件

这是一个非常简单版本的蜘蛛,他只能抓取一个url的内容,看到上边的回调多么令人头疼。那么我们开始进行优化。

首先,if else 这种方式可以进行优化,这个很简单,不用多说,放一个对比效果

rush:js;"> /// before if (err) { callback(err); } else { callback(null,true); }

/// after
if (err) {
return callback(err);
}
callback(null,true);

代码这么写,嵌套就会少一层,但经验丰富的程序员会认为,这样写过重强调了error,我们编程的重点应该放在处理正确的数据上,在可读性上也存在这样的要求。

一个优化是函数拆分,上边代码中的spider函数中,可以把下载文件和保存文件拆分出去。

(spider_v2.js)

function saveFile(filename,contents,callback) {
mkdirp(path.dirname(filename),err => {
if (err) {
return callback(err);
}
fs.writeFile(filename,callback);
});
}

function download(url,callback) {
console.log(Downloading ${url});

request(url,body) => {
if (err) {
return callback(err);
}
saveFile(filename,err => {
if (err) {
return callback(err);
}
console.log(Downloaded and saved: ${url});
callback(null,body);
});
})
}

function spider(url,exists => {
if (exists) {
return callback(null,false);
}
download(url,err => {
if (err) {
return callback(err);
}
callback(null,true);
})
});
}

spider(process.argv[2],downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(Completed the download of ${filename});
} else {
console.log(${filename} was already downloaded);
}
});

上边的代码基本上是采用原生优化后的结果,但这个蜘蛛的功能太过简单,我们现在需要抓取某个网页中的所有url,这样才会引申出串行和并行的问题。

(spider_v3.js)

function saveFile(filename,body);
});
})
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl,nesting,callback) {
if (nesting === 0) {
return process.nextTick(callback);
}

const links = utilities.getPageLinks(currentUrl,body);

function iterate(index) {
if (index === links.length) {
return callback();
}
spider(links[index],nesting - 1,err => {
if (err) {
return callback(err);
}
iterate((index + 1));
})
}

iterate(0);
}

function spider(url,callback) {
const filename = utilities.urlToFilename(url);

fs.readFile(filename,"utf8",body) => {
if (err) {
if (err.code !== 'ENOENT') {
return callback(err);
}
return download(url,body) => {
if (err) {
return callback(err);
}
spiderLinks(url,callback);
});
}

spiderLinks(url,callback);

});
}

spider(process.argv[2],2,downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(Completed the download of ${filename});
} else {
console.log(${filename} was already downloaded);
}
});

上边的代码相比之前的代码多了两个核心功能,首先是通过辅助类获取到了某个body中的links:

rush:js;"> const links = utilities.getPageLinks(currentUrl,body);

内部实现就不解释了,另一个核心代码就是:

{ if (err) { return callback(err); } iterate((index + 1)); }) }

iterate(0);
}

可以说上边这一小段代码,就是采用原生实现异步串行的pattern了。除了这些之外,还引入了nesting的概念,通过这是这个属性,可以控制抓取层次。

到这里我们就完整的实现了串行的功能,考虑到性能,我们要开发并行抓取的功能

(spider_v4.js)

function saveFile(filename,body);
if (links.length === 0) {
return process.nextTick(callback);
}

let completed = 0,hasErrors = false;

function done(err) {
if (err) {
hasErrors = true;
return callback(err);
}

if (++completed === links.length && !hasErrors) {
  return callback();
}

}

links.forEach(link => {
spider(link,done);
});
}

const spidering = new Map();

function spider(url,callback) {
if (spidering.has(url)) {
return process.nextTick(callback);
}

spidering.set(url,true);

const filename = utilities.urlToFilename(url);

/// In this pattern,there will be some issues.
/// Possible problems to download the same url again and again。
fs.readFile(filename,downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(Completed the download of ${filename});
} else {
console.log(${filename} was already downloaded);
}
});

这段代码同样很简单,也有两个核心内容一个是如何实现并发:

rush:js;"> /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl,done); }); }

上边的代码可以说是实现并发的一个pattern。利用循环遍历来实现。另一个核心是,既然是并发的,那么利用 fs.exists 就会存在问题,可能会重复下载同一文件,这里的解决方案是:

  • 使用Map缓存某一url,url应该作为key

现在我们又有了新的需求,要求限制同时并发的最大数,那么在这里就引进了一个我认为最重要的概念:队列。

(task-Queue.js)

rush:js;"> class TaskQueue { constructor(concurrency) { this.concurrency = concurrency; this.running = 0; this.queue = []; }

pushTask(task) {
this.queue.push(task);
this.next();
}

next() {
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift();
task(() => {
this.running--;
this.next();
});
this.running++;
}
}
}

module.exports = TaskQueue;

上边的代码就是队列的实现代码,核心是 next() 方法,可以看出,当task加入队列中后,会立刻执行,这不是说这个任务一定马上执行,而是指的是next会立刻调用

(spider_v5.js)

function saveFile(filename,hasErrors = false;

links.forEach(link => {
/// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
/// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
downloadQueue.pushTask(done => {
spider(link,err => {
/// 这里表示,只要发生错误,队列就会退出
if (err) {
hasErrors = true;
return callback(err);
}
if (++completed === links.length && !hasErrors) {
callback();
}

    done();
  });
});

});
}

const spidering = new Map();

function spider(url,downloaded) => {
if (err) {
console.log(error: ${err});
} else if (downloaded) {
console.log(Completed the download of ${filename});
} else {
console.log(${filename} was already downloaded);
}
});

因此,为了限制并发的个数,只需在 spiderLinks 方法中,把task遍历放入队列就可以了。这相对来说很简单。

到这里为止,我们使用原生JavaScript实现了一个有相对完整功能的网络蜘蛛,既能串行,也能并发,还可以控制并发个数。

2.使用async库

把不同的功能放到不同的函数中,会给我们带来巨大的好处,async库十分流行,它的性能也不错,它内部基于callback。

(spider_v6.js)

function download(url,callback) {
console.log(Downloading ${url});

let body;

series([
callback => {
request(url,resBody) => {
if (err) {
return callback(err);
}
body = resBody;
callback();
});
},mkdirp.bind(null,path.dirname(filename)),callback => {
fs.writeFile(filename,callback);
}
],err => {
if (err) {
return callback(err);
}
console.log(Downloaded and saved: ${url});
callback(null,body);
});
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl,body);
if (links.length === 0) {
return process.nextTick(callback);
}

eachSeries(links,(link,cb) => {
"use strict";
spider(link,cb);
},callback);
}

const spidering = new Map();

function spider(url,true);

const filename = utilities.urlToFilename(url);

fs.readFile(filename,1,downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(Completed the download of ${filename});
} else {
console.log(${filename} was already downloaded);
}
});

在上边的代码中,我们只使用了async的三个功能

rush:js;"> const series = require("async/series"); // 串行 const eachSeries = require("async/eachSeries"); // 并行 const queue = require("async/queue"); // 队列

由于比较简单,就不做解释了。async中的队列的代码在(spider_v7.js)中,和上边我们自定义的队列很相似,也不做更多解释了。

3.Promise

Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。

其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格

(spider_v8.js)

function saveFile(filename,filename) {
console.log(Downloading ${url});

let body;

return request(url)
.then(response => {
"use strict";
body = response.body;
return mkdirp(path.dirname(filename));
})
.then(() => writeFile(filename,body))
.then(() => {
"use strict";
console.log(Downloaded adn saved: ${url});
return body;
});
}

/// promise编程的本质就是为了解决函数中设置回调函数的问题
/// 通过中间层promise来实现异步函数同步化
function spiderLinks(currentUrl,nesting) {
let promise = Promise.resolve();
if (nesting === 0) {
return promise;
}

const links = utilities.getPageLinks(currentUrl,body);

links.forEach(link => {
"use strict";
promise = promise.then(() => spider(link,nesting - 1));
});

return promise;
}

function spider(url,nesting) {
const filename = utilities.urlToFilename(url);

return readFile(filename,"utf8")
.then(
body => spiderLinks(url,nesting),err => {
"use strict";
if (err.code !== 'ENOENT') {
/// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
throw err;
}
return download(url,filename)
.then(body => spiderLinks(url,nesting));
}
);
}

spider(process.argv[2],1)
.then(() => {
"use strict";
console.log('Download complete');
})
.catch(err => {
"use strict";
console.log(err);
});

可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。

在设计api的时候,应该支持两种方式,及支持callback,又支持promise

{ "use strict"; process.nextTick(() => { const result = dividend / divisor; if (isNaN(result) || !Number.isFinite(result)) { const error = new Error("Invalid operands"); if (cb) { cb(error); } return reject(error); }
  if (cb) {
    cb(null,result);
  }
  resolve(result);
});

});
}

asyncDivision(10,result) => {
"use strict";
if (err) {
return console.log(err);
}
console.log(result);
});

asyncDivision(22,11)
.then((result) => console.log(result))
.catch((err) => console.log(err));

4.Generator

Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。

(spider_v9.js)

rush:js;"> const thunkify = require("thunkify"); const co = require("co"); const path = require("path"); const utilities = require("./utilities");

const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);

function* download(url,filename) {
console.log(Downloading ${url});

const response = yield request(url);
console.log(response);

const body = response[1];
yield mkdirp(path.dirname(filename));

yield writeFile(filename,body);

console.log(Downloaded and saved ${url});
return body;
}

function* spider(url,nesting) {
const filename = utilities.urlToFilename(url);

let body;

try {
body = yield readFile(filename,"utf8");
} catch (err) {
if (err.code !== 'ENOENT') {
throw err;
}
body = yield download(url,filename);
}

yield spiderLinks(url,nesting);
}

function* spiderLinks(currentUrl,nesting) {
if (nesting === 0) {
return nextTick();
}

const links = utilities.getPageLinks(currentUrl,body);

for (let i = 0; i < links.length; i++) {
yield spider(links[i],nesting - 1);
}
}

/// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息
co(function* () {
try {
yield spider(process.argv[2],1);
console.log('Download complete');
} catch (err) {
console.log(err);
}
});

总结

我并没有写promise和generator并发的代码。以上这些内容来自于这本书nodejs-design-patterns

demo下载

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程之家。

相关文章

这篇文章主要介绍“基于nodejs的ssh2怎么实现自动化部署”的...
本文小编为大家详细介绍“nodejs怎么实现目录不存在自动创建...
这篇“如何把nodejs数据传到前端”文章的知识点大部分人都不...
本文小编为大家详细介绍“nodejs如何实现定时删除文件”,内...
这篇文章主要讲解了“nodejs安装模块卡住不动怎么解决”,文...
今天小编给大家分享一下如何检测nodejs有没有安装成功的相关...