Node.js教程之异步流控制
沉沙 2018-05-15 来源 : 阅读 468 评论 0

摘要:由于Node.js独特的异步特性,才出现了“回调地狱”的问题,本篇Node.js教程中,比较详细的记录了如何解决异步流问题。希望本篇Node.js教程对大家有所帮助。

在没有深度使用函数回调的经验的时候,去看这些内容还是有一点吃力的。由于Node.js独特的异步特性,才出现了“回调地狱”的问题,这篇文章中,比较详细的记录了如何解决异步流问题。

文章会很长,而且这篇是对异步流模式的解释。文中会使用一个简单的网络蜘蛛的例子,它的作用是抓取指定URL的网页内容并保存在项目中。

1.原生JavaScript模式

本篇不针对初学者,因此会省略掉大部分的基础内容的讲解:

(spider_v1.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
 
function spider(url, callback) {
    const filename = utilities.urlToFilename(url);
    console.log(`filename: ${filename}`);
 
    fs.exists(filename, exists => {
        if (!exists) {
            console.log(`Downloading ${url}`);
 
            request(url, (err, response, body) => {
                if (err) {
                    callback(err);
                } else {
                    mkdirp(path.dirname(filename), err => {
                        if (err) {
                            callback(err);
                        } else {
                            fs.writeFile(filename, body, err => {
                                if (err) {
                                    callback(err);
                                } else {
                                    callback(null, filename, true);
                                }
                            });
                        }
                    });
                }
            });
        } else {
            callback(null, filename, false);
        }
    });}
spider(process.argv[2], (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }});

上边的代码的流程大概是这样的:

· 把url转换成filename

· 判断该文件名是否存在,若存在直接返回,否则进入下一步

· 发请求,获取body

· 把body写入到文件中

这是一个非常简单版本的蜘蛛,他只能抓取一个url的内容,看到上边的回调多么令人头疼。那么我们开始进行优化。

首先,if else 这种方式可以进行优化,这个很简单,不用多说,放一个对比效果:

/// beforeif (err) {
    callback(err);} else {
    callback(null, filename, true);}
/// afterif (err) {
    return callback(err);}callback(null, filename, true);

代码这么写,嵌套就会少一层,但经验丰富的程序员会认为,这样写过重强调了error,我们编程的重点应该放在处理正确的数据上,在可读性上也存在这样的要求。

另一个优化是函数拆分,上边代码中的spider函数中,可以把下载文件和保存文件拆分出去。

(spider_v2.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
 
function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });}
function download(url, filename, callback) {
    console.log(`Downloading ${url}`);
 
    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })}
function spider(url, callback) {
    const filename = utilities.urlToFilename(url);
    console.log(`filename: ${filename}`);
 
    fs.exists(filename, exists => {
        if (exists) {
            return callback(null, filename, false);
        }
        download(url, filename, err => {
            if (err) {
                return callback(err);
            }
            callback(null, filename, true);
        })
    });}
spider(process.argv[2], (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }});

上边的代码基本上是采用原生优化后的结果,但这个蜘蛛的功能太过简单,我们现在需要抓取某个网页中的所有url,这样才会引申出串行和并行的问题。

(spider_v3.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
 
function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });}
function download(url, filename, callback) {
    console.log(`Downloading ${url}`);
 
    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })}
/// 最大的启发是实现了如何异步循环遍历数组function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
 
    function iterate(index) {
        if (index === links.length) {
            return callback();
        }
        spider(links[index], nesting - 1, err => {
            if (err) {
                return callback(err);
            }
            iterate((index + 1));
        })
    }
 
    iterate(0);}
function spider(url, nesting, callback) {
    const filename = utilities.urlToFilename(url);
 
    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }
 
        spiderLinks(url, body, nesting, callback);
    });}
spider(process.argv[2], 2, (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }});

上边的代码相比之前的代码多了两个核心功能,首先是通过辅助类获取到了某个body中的links:

const links = utilities.getPageLinks(currentUrl, body);

内部实现就不解释了,另一个核心代码就是:

/// 最大的启发是实现了如何异步循环遍历数组function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
 
    function iterate(index) {
        if (index === links.length) {
            return callback();
        }
        spider(links[index], nesting - 1, err => {
            if (err) {
                return callback(err);
            }
            iterate((index + 1));
        })
    }
 
    iterate(0);}

可以说上边这一小段代码,就是采用原生实现异步串行的pattern了。除了这些之外,还引入了nesting的概念,通过这是这个属性,可以控制抓取层次。

到这里我们就完整的实现了串行的功能,考虑到性能,我们要开发并行抓取的功能。

(spider_v4.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
 
function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });}
function download(url, filename, callback) {
    console.log(`Downloading ${url}`);
 
    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })}
/// 最大的启发是实现了如何异步循环遍历数组function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }
 
    let completed = 0, hasErrors = false;
 
    function done(err) {
        if (err) {
            hasErrors = true;
            return callback(err);
        }
 
        if (++completed === links.length && !hasErrors) {
            return callback();
        }
    }
 
    links.forEach(link => {
        spider(link, nesting - 1, done);
    });}
const spidering = new Map();
function spider(url, nesting, callback) {
    if (spidering.has(url)) {
        return process.nextTick(callback);
    }
 
    spidering.set(url, true);
 
    const filename = utilities.urlToFilename(url);
 
    /// In this pattern, there will be some issues.
    /// Possible problems to download the same url again and again。
    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }
 
        spiderLinks(url, body, nesting, callback);
    });}
spider(process.argv[2], 2, (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }});

这段代码同样很简单,也有两个核心内容。一个是如何实现并发:

/// 最大的启发是实现了如何异步循环遍历数组function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }
 
    let completed = 0, hasErrors = false;
 
    function done(err) {
        if (err) {
            hasErrors = true;
            return callback(err);
        }
 
        if (++completed === links.length && !hasErrors) {
            return callback();
        }
    }
 
    links.forEach(link => {
        spider(link, nesting - 1, done);
    });}

上边的代码可以说是实现并发的一个pattern。利用循环遍历来实现。另一个核心是,既然是并发的,那么利用fs.exists就会存在问题,可能会重复下载同一文件,这里的解决方案是:

· 使用Map缓存某一url,url应该作为key

现在我们又有了新的需求,要求限制同时并发的最大数,那么在这里就引进了一个我认为最重要的概念:队列。

(task-Queue.js)

class TaskQueue {
    constructor(concurrency) {
        this.concurrency = concurrency;
        this.running = 0;
        this.queue = [];
    }
 
    pushTask(task) {
        this.queue.push(task);
        this.next();
    }
 
    next() {
        while (this.running < this.concurrency && this.queue.length) {
            const task = this.queue.shift();
            task(() => {
                this.running--;
                this.next();
            });
            this.running++;
        }
    }}
module.exports = TaskQueue;


上边的代码就是队列的实现代码,核心是next()方法,可以看出,当task加入队列中后,会立刻执行,这不是说这个任务一定马上执行,而是指的是next会立刻调用。

(spider_v5.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const TaskQueue = require("./task-Queue");
const downloadQueue = new TaskQueue(2);
 
function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });}
function download(url, filename, callback) {
    console.log(`Downloading ${url}`);
 
    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })}
/// 最大的启发是实现了如何异步循环遍历数组function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }
 
    let completed = 0, hasErrors = false;
 
    links.forEach(link => {
        /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
        /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
        downloadQueue.pushTask(done => {
            spider(link, nesting - 1, err => {
                /// 这里表示,只要发生错误,队列就会退出
                if (err) {
                    hasErrors = true;
                    return callback(err);
                }
                if (++completed === links.length && !hasErrors) {
                    callback();
                }
 
                done();
            });
        });
 
    });}
const spidering = new Map();
function spider(url, nesting, callback) {
    if (spidering.has(url)) {
        return process.nextTick(callback);
    }
 
    spidering.set(url, true);
 
    const filename = utilities.urlToFilename(url);
 
    /// In this pattern, there will be some issues.
    /// Possible problems to download the same url again and again。
    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }
 
        spiderLinks(url, body, nesting, callback);
    });}
spider(process.argv[2], 2, (err, filename, downloaded) => {
    if (err) {
        console.log(`error: ${err}`);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }});

因此,为了限制并发的个数,只需在spiderLinks方法中,把task遍历放入队列就可以了。这相对来说很简单。

到这里为止,我们使用原生JavaScript实现了一个有相对完整功能的网络蜘蛛,既能串行,也能并发,还可以控制并发个数。

2.使用async库

把不同的功能放到不同的函数中,会给我们带来巨大的好处,async库十分流行,它的性能也不错,它内部基于callback。

(spider_v6.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");
 
function download(url, filename, callback) {
    console.log(`Downloading ${url}`);
 
    let body;
 
    series([
        callback => {
            request(url, (err, response, resBody) => {
                if (err) {
                    return callback(err);
                }
                body = resBody;
                callback();
            });
        },
        mkdirp.bind(null, path.dirname(filename)),
        callback => {
            fs.writeFile(filename, body, callback);
        }
    ], err => {
        if (err) {
            return callback(err);
        }
        console.log(`Downloaded and saved: ${url}`);
        callback(null, body);
    });}
/// 最大的启发是实现了如何异步循环遍历数组function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }
 
    eachSeries(links, (link, cb) => {
        "use strict";
        spider(link, nesting - 1, cb);
    }, callback);}
const spidering = new Map();
function spider(url, nesting, callback) {
    if (spidering.has(url)) {
        return process.nextTick(callback);
    }
 
    spidering.set(url, true);
 
    const filename = utilities.urlToFilename(url);
 
    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }
 
        spiderLinks(url, body, nesting, callback);
    });}
spider(process.argv[2], 1, (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }});

在上边的代码中,我们只使用了async的三个功能:

const series = require("async/series"); // 串行
const eachSeries = require("async/eachSeries"); // 并行
const queue = require("async/queue"); // 队列

由于比较简单,就不做解释了。async中的队列的代码在(spider_v7.js)中,和上边我们自定义的队列很相似,也不做更多解释了。

3.Promise

Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。

其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格

(spider_v8.js)

const utilities = require("./utilities");
const request = utilities.promisify(require("request"));
const fs = require("fs");
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
const mkdirp = utilities.promisify(require("mkdirp"));
const path = require("path");
 
function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });}
function download(url, filename) {
    console.log(`Downloading ${url}`);
 
    let body;
 
    return request(url)
        .then(response => {
            "use strict";
            body = response.body;
            return mkdirp(path.dirname(filename));
        })
        .then(() => writeFile(filename, body))
        .then(() => {
            "use strict";
            console.log(`Downloaded adn saved: ${url}`);
            return body;
        });}
/// promise编程的本质就是为了解决在函数中设置回调函数的问题/// 通过中间层promise来实现异步函数同步化function spiderLinks(currentUrl, body, nesting) {
    let promise = Promise.resolve();
    if (nesting === 0) {
        return promise;
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
 
    links.forEach(link => {
        "use strict";
        promise = promise.then(() => spider(link, nesting - 1));
    });
 
    return promise;}
function spider(url, nesting) {
    const filename = utilities.urlToFilename(url);
 
    return readFile(filename, "utf8")
        .then(
            body => spiderLinks(url, body, nesting),
            err => {
                "use strict";
                if (err.code !== 'ENOENT') {
                    /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
                    throw err;
                }
                return download(url, filename)
                    .then(body => spiderLinks(url, body, nesting));
            }
        );}
spider(process.argv[2], 1)
    .then(() => {
        "use strict";
        console.log('Download complete');
    })
    .catch(err => {
        "use strict";
        console.log(err);
    });

可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。

在设计api的时候,应该支持两种方式,及支持callback,又支持promise

function asyncDivision(dividend, divisor, cb) {
    return new Promise((resolve, reject) => {
        "use strict";
        process.nextTick(() => {
            const result = dividend / divisor;
            if (isNaN(result) || !Number.isFinite(result)) {
                const error = new Error("Invalid operands");
                if (cb) {
                    cb(error);
                }
                return reject(error);
            }
 
            if (cb) {
                cb(null, result);
            }
            resolve(result);
        });
    });}
asyncDivision(10, 2, (err, result) => {
    "use strict";
    if (err) {
        return console.log(err);
    }
    console.log(result);});
asyncDivision(22, 11)
    .then((result) => console.log(result))
    .catch((err) => console.log(err));

4.Generator

Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。

(spider_v9.js)

const thunkify = require("thunkify");
const co = require("co");
const path = require("path");
const utilities = require("./utilities");
 
const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);
 
function* download(url, filename) {
    console.log(`Downloading ${url}`);
 
    const response = yield request(url);
    console.log(response);
 
    const body = response[1];
    yield mkdirp(path.dirname(filename));
 
    yield writeFile(filename, body);
 
    console.log(`Downloaded and saved ${url}`);
    return body;}
function* spider(url, nesting) {
    const filename = utilities.urlToFilename(url);
 
    let body;
 
    try {
        body = yield readFile(filename, "utf8");
    } catch (err) {
        if (err.code !== 'ENOENT') {
            throw err;
        }
        body = yield download(url, filename);
    }
 
    yield  spiderLinks(url, body, nesting);}
function* spiderLinks(currentUrl, body, nesting) {
    if (nesting === 0) {
        return nextTick();
    }
 
    const links = utilities.getPageLinks(currentUrl, body);
 
    for (let i = 0; i < links.length; i++) {
        yield spider(links[i], nesting - 1);
    }}
/// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息co(function* () {
    try {
        yield spider(process.argv[2], 1);
        console.log('Download complete');
    } catch (err) {
        console.log(err);
    }});


本文由职坐标整理发布,欢迎关注职坐标Node.js频道,学习更多数据库知识!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程