实现并发控制函数

共 3023字,需浏览 7分钟

 ·

2021-12-30 12:28


neo-async 和 async 模块都是为了解决嵌套金字塔,和异步流程控制而生,neo-async 是 async 的替代品,因为 neo-async 比 async 的性能更快。


为什么要使用这个模块,举个例子,比如我们需要读取多个文件,将读取文件的结果保存在数组中。


let list = []fs.readFile('file1', 'utf8', function (err, res) {  list.push(res)  fs.readFile('file2', 'utf8', function (err, res) {    list.push(res)    fs.readFile('file3', 'utf8', function (err, res) {      list.push(res)      console.log(list)    })  })})


使用 neo-async 我们可以这样写:

function getFile(file, callback) {  fs.readFile(file, 'utf8', function (err, res) {    if (err) {      return callback(err);    }    callback(null, res);  });}
async.map(['file1','file2','file3'], getFile, function(err, results) { // 返回一个新数组 console.log(results)});

each

我们要学习的第一个函数是 each,注意,由于此函数并行地将 iterator 应用于每个项,因此不能保证 iterator 函数将按顺序完成。


// arrayvar order = [];var array = [1, 3, 2];var iterator = function(num, done) {  setTimeout(function() {    order.push(num);    done();  }, num * 10);};async.each(array, iterator, function(err, res) {  console.log(res); // undefined  console.log(order); // [1, 2, 3]});


另外,eachLimit 与 each 相同,但一次最多运行多少异步操作,可以自己设置限制。

eachLimit

// arrayvar order = [];var array = [1, 5, 3, 4, 2];var iterator = function(num, done) {  setTimeout(function() {    order.push(num);    done();  }, num * 10);};async.eachLimit(array, 2, iterator, function(err, res) {  console.log(res); // undefined  console.log(order); // [1, 3, 5, 2, 4]});


控制并发数量的实现原理如下:

var noop = function noop() {};
function timesSync(n, iterator) { var index = -1; while (++index < n) { iterator(index); }}function eachLimit(collection, limit, iterator, callback) { callback = callback || noop; var size = collection.length; var sync = false; var started = 0; var completed = 0;
timesSync(limit > size ? size : limit, iterate);
function iterate() { if (started < size) { iterator(collection[started++], done); } }
function done(err, bool) { if (err) { callback(err); } else if (++completed === size) { callback(null); } else { iterate(); } }}

promise


我们都知道异步编程的方式有很多种,包括回调和 promise 的形式,下面是通过 promise 控制并发数量。


/** * @params list {Array} - 要迭代的数组 * @params limit {Number} - 并发数量控制数 * @params asyncHandle {Function} - 对`list`的每一个项的处理函数,参数为当前处理项,必须 return 一个Promise来确定是否继续进行迭代 * @return {Promise} - 返回一个 Promise 值来确认所有数据是否迭代完成 */let eachLimit = (list, limit, asyncHandle) => {  let recursion = (arr) => {    return asyncHandle(arr.shift()).then((res) => {      if (arr.length !== 0) {        return recursion(arr) // 数组还未迭代完,递归继续进行迭代      }    })  };
let listCopy = [].concat(list); let asyncList = []; // 正在进行的所有并发异步操作 while (limit--) { asyncList.push(recursion(listCopy)); } return Promise.all(asyncList); // 所有并发异步操作都完成后,本次并发控制迭代完成}
const order = []const timeout = i => new Promise(resolve => { setTimeout(() => { order.push(i) resolve(i) }, i * 10)});eachLimit([1, 5, 3, 4, 2], 2, timeout).then((res) => { console.log(order)})


浏览 12
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报