Node.js教程 Node.js中的drain事件浅析
沉沙 2018-06-13 来源 : 阅读 1368 评论 0

摘要:本篇Node.js教程对drain事件进行了详细的探究,希望阅读本篇文章以后大家有所收获,帮助大家对Node.js的理解更加深入。

socket.on('drain', function(){
    console.log('drain event fired.');
});
 
socket.write('some data.');

实际放到线上测试的时候发现,在一些情况下,drain 事件真的会被触发,那到底什么时候会触发 drain 事件呢?drain 事件能用来做什么呢?本着打破砂锅问到底的精神,本文决定探究一番。


探究

最简单的办法就是查文档。因为写的是网络请求相关的代码,那么就先翻越 net 和 socket 相关的部分。在 Node.js 官方文档中,对于 socket.write 有这么一部分描述:

Returns true if the entire data was flushed successfully to the kernel buffer. Returns false if all or part of the data was queued in user memory. 'drain' will be emitted when the buffer is again free.

也就是说,drain 事件是和 socket.write 的返回值强关联的,那么我们可以做一个简单的实验(只写关键部分):

socket.on('drain', function(){
    console.log('drain event fired.');
});
 
var ret = socket.write('some data.');
 
console.log('write data returned %s.', ret);

可是无论怎么运行这部分代码,返回值总是 true,drain 事件没有被触发。那为啥线上就能触发呢?按照文档所说,只有全部或者部分数据被缓冲在了内存里面才会返回 false。那问题又来了,什么时候数据才会被缓冲呢?

既然是被缓冲了,那最先猜测到就是数据流量太大。就像每天上下班高峰期的文一西路那样,一旦车流量太大,前面的路口塞满了,交警就会让后面的车停下来。

好,那我们加大“车”流量(为节省篇幅,部分代码省略):

服务端代码:

var net = require('net');
net.createServer(function(socket){
    // do something
}).listen(6666, '127.0.0.1');

客户端代码:

var net = require('net');
function writeToRemote(){
    var client = new net.Socket();
    client.connect(6666, '127.0.0.1', function(){
        if(!client.write('some data by ziying.')){
            console.log('write returned false.');
        }
    });
    client.on('drain', function(){
        console.log('drain event fired.');
    });
    client.on('data', function(){
        client.destroy();
    });
}
// 加大流量for(var i = 0; i < 1000; i++){
    writeToRemote();
}

但运行多次之后发现仍旧没有看到任何 drain 事件的迹象。难道次数不够?随着继续增大 i 的最大值,直到遇到(libuv) kqueue(): Too many open files in system的错误时候,仍旧没看到 drain 事件。

接下来看 Node.js 源代码。

因为 socket.write 实际上是调用的 Stream.write(参考此处源代码),最后我们在 Stream.write的代码中找到了一丝端倪:

var ret = state.length < state.highWaterMark;// we must ensure that previous needDrain will not be reset to false.if (!ret)
  state.needDrain = true;
// something else
return ret;

可以看到当要写的数据的长度大于 highWaterMark (字面理解:高水位线)的时候,那么 Stream.write 就会返回 false,也就会触发 drain 事件了。

那这个高水位线具体是多少呢?可以继续看代码:

var hwm = options.highWaterMark;var defaultHwm = this.objectMode ? 16 : 16 * 1024;this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

默认值是 16KB,看来还是挺大的啊。所以回想一下刚才我们的实验程序,一个是写的数据比较小,另外一个是实验代码中的服务器端没有复杂逻辑,数据处理的也比较快,我们仍旧拿刚才的车流量的例子,虽然车很多很多,但是如果每辆车都开得飞快,那路也不会堵。只有当一些车比较慢,影响到了后面车的速度的时候,整体速度就会下来,就变堵了。

根据这个思路,我们换成下面这个实验:

var http = require('http');var fs = require('fs');
 
 
http.createServer(function(req, res) {
    // 哈哈,这个代码写于圣诞,于是就用了圣诞的歌曲,大家圣诞快乐哦!
    var filePath = './Christmas.mp3';
    var stat = fs.statSync(filePath);
 
    response.writeHead(200, {
        'Content-Type': 'audio/mpeg',
        'Content-Length': stat.size
    });
 
    var readStream = fs.createReadStream(filePath);
    readStream.on('data', function(data) {
        res.write(data);
    });
 
    res.on('drain', function() {
      console.log('drain event fired.');
    });
 
    readStream.on('end', function() {
        res.end();
    });
}).listen(6969);

这里我们启动了一个简单的 http server,任何 requset 过来,都会返回一个 5M 大小的 圣诞歌曲的内容。然后我们对这个 http server 发起 1000 次 GET 请求。果然不出所料,还没等所有请求发完,一堆的 drain event fired 日志出现了:

Node.js教程 Node.js中的drain事件浅析


如果对 res.write 的返回值做下日志,也会发现返回了很多 false。

原因也很容易想到,硬盘读取这个 MP3 文件的速度(测试环境为 RMBP 的 SSD 硬盘)一般都会快于将数据通过 HTTP Response 返回给用户(即便是 localhost 的访问,更不用说外网复杂错综的网络环境了),所以,当 MP3 很快就被读取过来,但又没有很快的将数据写回,那么这个 Stream 中的 data 就被缓存了。于是,很自然而然的设想,在这个小应用中,也许这么做并没有什么,但当应用的访问量逐渐增大的时候,这个问题就可能会爆发,比如造成个内存泄露啊之类的。

怎么用

那上面的代码该怎么改进呢?

var readStream = fs.createReadStream(filePath);
readStream.on('data', function(data) {
    if(!res.write(data)){
        readStream.pause();
    }
});
 
res.on('drain', function() {
    readStream.resume();
});

也就是说,当 write 返回 false 的时候,我们暂停读取流,当缓存的数据清空之后,我们再继续读取流,相当于我们根据输出流来对读取流做限流。反过来,如果写入流快于读取流,我们也可以对写入流限流。

真的是这样吗?

刚才提到了,如果我们没做这部分处理的话,应该很容易造成内存泄露,那我们不妨做个实验验证下。使用上面两段代码分别进行不带限流和带限流的实验,每隔 1s 使用 process.memoryUsage() 打印出来内存使用(我们在启动 server 之后就开始打印内存数据):

xxx.listen(PORT, function(){
 
  setInterval(function(){
    var memoryUsage = process.memoryUsage();
    // 用这个格式是为了方便我使用 Markdown 生成表格导入到 Numbers
    console.log('| %s | %s | %s |', memoryUsage.rss, memoryUsage.heapTotal, memoryUsage.heapUsed);
  }, 1000);
 
});

最终得出了如下数据:

Node.js教程 Node.js中的drain事件浅析

限流

Node.js教程 Node.js中的drain事件浅析


没有限流


可以看到,内存使用其实差不多......

难道前面猜想错了么?最简单的办法,继续看源代码。

看下 fs.createReadStream 的代码,就可以知道它的工作方式是先在内存中准备一段 Buffer,然后在 fs.read() 读取文件到这个 Buffer 中,完成一次读取时,则从 Buffer 中通过 slice 方法取出那个数据作为一个小 Buffer 对象,再通过 data 事件传给调用方[1]。

再看下 pause 具体是怎么实现的:

Readable.prototype.pause = function() {
  if (false !== this._readableState.flowing) {
    this._readableState.flowing = false;
    this.emit('pause');
  }
  return this;
};

pause 方法设置了一个状态位。在 flow 方法中,如果是 paused 的状态,那么就不再从文件流中读取数据了:

function flow(stream) {
  var state = stream._readableState;
  debug('flow', state.flowing);
  if (state.flowing) {
    do {
      var chunk = stream.read();
    } while (null !== chunk && state.flowing);
  }
}

所以在 pause 的时候,文件的数据流依旧在 Buffer 中。

回头看下我们这个例子,其实是写比较慢,如果我们没有对写入的返回值做判断的话,Writable Stream 本身也会把多余的数据缓存起来。具体可以看 WriteOrBuffer 这个方法的实现:

if (state.writing || state.corked) {
    // buffer data
} else {
    doWrite();
}

可以看出这里如果之前的写入没有完成,那么会把需要写入的数据缓存起来。

这样就不难理解了,如果我们采用限制读取流的方案,那么数据缓存在读取流的 Buffer 里,如果我们采取不限制读取流的方案,那么数据缓存在写入流的 Buffer 里,总之这部分数据都是要被缓存,只是缓存到不同的流的 Buffer 而已,所以这也能解释了为啥我们的测试结果,内存占用基本没有差别了。

更好的解决方案

其实,Node.js 里面提供了更好的解决方案,也就是 pipe。

我们将上面的代码继续改造下:

var readStream = fileSystem.createReadStream(filePath);readStream.pipe(res);

是不是更简单了呢?

我们看下官方代码是怎么实现的(缩减版):

  var ondrain = pipeOnDrain(src);
  dest.on('drain', ondrain);
 
 
  src.on('data', ondata);
  function ondata(chunk) {
    var ret = dest.write(chunk);
    if (false === ret) {
      src.pause();
    }
  }

是不是和我们自己的实现差不多呢?当然官方代码处理的更加细致,对很多情况做了判断,感兴趣的同学不妨阅读看看:)

该怎么用?

看到这里,大家可能会问,那按照这么说,drain 就完全没用了是么?非也非也。

再回想我们刚才的案例,我们的写入流是以 HTTP Response 的形式返回给用户,但如果写入流是我们的一个服务呢?比如我们需要往一台 Redis Server 里面插入大量数据,而这台 Redis 又承担对外提供服务的艰巨任务,那么,为了保证我们写入的同时这台 Server 依旧能较好的对外服务,很自然的就可以想到我们可以对写入流做限流。比如node_redis 这个模块里面的一个 example:

'use strict';
var redis = require('../index'),
    client = redis.createClient(),
    remaining_ops = 100000, paused = false;
function op() {
    if (remaining_ops <= 0) {
        console.error('Finished.');
        process.exit(0);
    }
 
    remaining_ops--;
    client.hset('test hash', 'val ' + remaining_ops, remaining_ops);
    if (client.should_buffer === true) {
        console.log('Pausing at ' + remaining_ops);
        paused = true;
    } else {
        setTimeout(op, 1);
    }
}
 
client.on('drain', function () {
    if (paused) {
        console.log('Resuming at ' + remaining_ops);
        paused = false;
        process.nextTick(op);
    } else {
        console.log('Got drain while not paused at ' + remaining_ops);
    }
});
 
op();

在这个例子中,当写入流开始缓存的时候,我们就停止写入了,等 buffer flush 完,我们再继续写入。通过这种形式,我们可以控制发送命令的速率。

小结

其实在我们平时写代码的过程中,并不需要刻意用到 drain 事件。Node.js 本身已经帮我们处理了很多细节。对于流的处理,推荐大家直接用简单方便的 pipe 方法。而drain 事件比较适合用在一些需要自己手工处理限流的场景。


本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注WEB前端Node.js频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程