Node.js从入门到精通 Node.js Stream模块详解
沉沙 2018-07-25 来源 : 阅读 920 评论 0

摘要:本篇Node.js教程探讨了Node.js Stream模块,希望阅读本篇文章以后大家有所收获,帮助大家对Node.js的理解更加深入。

本篇Node.js教程探讨了Node.js Stream模块,希望阅读本篇文章以后大家有所收获,帮助大家对Node.js的理解更加深入。


背景

在构建较复杂的系统时,通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务。譬如,shell通过管道|连接各部分,其输入输出的规范是文本流。

在Node.js中,内置的Stream模块也实现了类似功能,各部分通过.pipe()连接。

流的四种类型

Stream提供了以下四种类型的流:

var Stream = require('stream')

var Readable = Stream.Readablevar Writable = Stream.Writablevar Duplex = Stream.Duplexvar Transform = Stream.Transform

使用Stream可实现数据的流式处理,如:

var fs = require('fs')// `fs.createReadStream`创建一个`Readable`对象以读取`bigFile`的内容,并输出到标准输出

// 如果使用`fs.readFile`则可能由于文件过大而失败

fs.createReadStream(bigFile).pipe(process.stdout)

Readable

创建可读流。

实例:流式消耗迭代器中的数据。

 

'use strict'

const Readable = require('stream').Readable

 

class ToReadable extends Readable {

  constructor(iterator) {

    super()

    this.iterator = iterator

  }

 

  // 子类需要实现该方法

  // 这是生产数据的逻辑  _read() {

    const res = this.iterator.next()

    if (res.done) {

      // 数据源已枯竭,调用`push(null)`通知流

      return this.push(null)

    }

    setTimeout(() => {

      // 通过`push`方法将数据添加到流中

      this.push(res.value + '\n')

    }, 0)

  }

}

 

module.exports = ToReadable

 

实际使用时,new ToReadable(iterator)会返回一个可读流,下游可以流式的消耗迭代器中的数据。

 

const iterator = function (limit) {

  return {

    next: function () {

      if (limit--) {

        return { done: false, value: limit + Math.random() }

      }

      return { done: true }

    }

  }

}(1e10)

 

const readable = new ToReadable(iterator)

// 监听`data`事件,一次获取一个数据

readable.on('data', data => process.stdout.write(data))

// 所有数据均已读完

readable.on('end', () => process.stdout.write('DONE'))

 

执行上述代码,将会有100亿个随机数源源不断地写进标准输出流。

创建可读流时,需要继承Readable,并实现_read方法。

· _read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。

· 在_read方法中,通过调用push(data)将数据放入可读流中供下游消耗。

· 在_read方法中,可以同步调用push(data),也可以异步调用。

· 当全部数据都生产出来后,必须调用push(null)来结束可读流。

· 流一旦结束,便不能再调用push(data)添加数据。

可以通过监听data事件的方式消耗可读流。

· 在首次监听其data事件后,readable便会持续不断地调用_read(),通过触发data事件将数据输出。

· 第一次data事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中)。

· 当数据全部被消耗时,会触发end事件。

上面的例子中,process.stdout代表标准输出流,实际是一个可写流。下小节中介绍可写流的用法。

Writable

创建可写流。

前面通过继承的方式去创建一类可读流,这种方法也适用于创建一类可写流,只是需要实现的是_write(data, enc, next)方法,而不是_read()方法。

有些简单的情况下不需要创建一类流,而只是一个流对象,可以用如下方式去做:

 

const Writable = require('stream').Writable

 

const writable = Writable()// 实现`_write`方法

// 这是将数据写入底层的逻辑

writable._write = function (data, enc, next) {

  // 将流中的数据写入底层  process.stdout.write(data.toString().toUpperCase())

  // 写入完成时,调用`next()`方法通知流传入下一个数据  process.nextTick(next)

}

// 所有数据均已写入底层

writable.on('finish', () => process.stdout.write('DONE'))

// 将一个数据写入流中

writable.write('a' + '\n')

writable.write('b' + '\n')

writable.write('c' + '\n')

// 再无数据写入流时,需要调用`end`方法

writable.end()

 

· 上游通过调用writable.write(data)将数据写入可写流中。write()方法会调用_write()将data写入底层。

· 在_write中,当数据成功写入底层后,必须调用next(err)告诉流开始处理下一个数据。

· next的调用既可以是同步的,也可以是异步的。

· 上游必须调用writable.end(data)来结束可写流,data是可选的。此后,不能再调用write新增数据。

· 在end方法调用后,当所有底层的写操作均完成时,会触发finish事件。

Duplex

创建可读可写流。

Duplex实际上就是继承了Readable和Writable的一类流。
所以,一个Duplex对象既可当成可读流来使用(需要实现_read方法),也可当成可写流来使用(需要实现_write方法)。

 

var Duplex = require('stream').Duplex

var duplex = Duplex()

// 可读端底层读取逻辑

duplex._read = function () {

  this._readNum = this._readNum || 0

  if (this._readNum > 1) {

    this.push(null)

  } else {

    this.push('' + (this._readNum++))

  }

}

// 可写端底层写逻辑

duplex._write = function (buf, enc, next) {

  // a, b

  process.stdout.write('_write ' + buf.toString() + '\n')

  next()

}

// 0, 1

duplex.on('data', data => console.log('ondata', data.toString()))

 

duplex.write('a')

duplex.write('b')

 

duplex.end()

 

上面的代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
同时,又实现了_write方法,可作为下游去消耗数据。

因为它既可读又可写,所以称它有两端:可写端和可读端。
可写端的接口与Writable一致,作为下游来使用;可读端的接口与Readable一致,作为上游来使用。

Transform

在上面的例子中,可读流中的数据(0, 1)与可写流中的数据('a', 'b')是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端。
Tranform继承自Duplex,并已经实现了_read和_write方法,同时要求用户实现一个_transform方法。

 

'use strict'

 

const Transform = require('stream').Transform

 

class Rotate extends Transform {

  constructor(n) {

    super()

    // 将字母旋转`n`个位置

    this.offset = (n || 13) % 26

  }

 

  // 将可写端写入的数据变换后添加到可读端  _transform(buf, enc, next) {

    var res = buf.toString().split('').map(c => {

      var code = c.charCodeAt(0)

      if (c >= 'a' && c <= 'z') {

        code += this.offset

        if (code > 'z'.charCodeAt(0)) {

          code -= 26

        }

      } else if (c >= 'A' && c <= 'Z') {

        code += this.offset

        if (code > 'Z'.charCodeAt(0)) {

          code -= 26

        }

      }

      return String.fromCharCode(code)

    }).join('')

 

    // 调用push方法将变换后的数据添加到可读端

    this.push(res)

    // 调用next方法准备处理下一个    next()

  }

 

}

var transform = new Rotate(3)

transform.on('data', data => process.stdout.write(data))

transform.write('hello, ')

transform.write('world!')

transform.end()

// khoor, zruog!

 

objectMode

前面几节的例子中,经常看到调用data.toString()。这个toString()的调用是必需的吗?
本节介绍完如何控制流中的数据类型后,自然就有了答案。

在shell中,用管道(|)连接上下游。上游输出的是文本流(标准输出流),下游输入的也是文本流(标准输入流)。在本文介绍的流中,默认也是如此。

对于可读流来说,push(data)时,data只能是String或Buffer类型,而消耗时data事件输出的数据都是Buffer类型。对于可写流来说,write(data)时,data只能是String或Buffer类型,_write(data)调用时传进来的data都是Buffer类型。

也就是说,流中的数据默认情况下都是Buffer类型。产生的数据一放入流中,便转成Buffer被消耗;写入的数据在传给底层写逻辑时,也被转成Buffer类型。

但每个构造函数都接收一个配置对象,有一个objectMode的选项,一旦设置为true,就能出现“种瓜得瓜,种豆得豆”的效果。

Readable未设置objectMode时:

 

const Readable = require('stream').Readable

 

const readable = Readable()

 

readable.push('a')

readable.push('b')

readable.push(null)

 

readable.on('data', data => console.log(data))

 

输出:

<Buffer 61>

<Buffer 62>

Readable设置objectMode后:

 

const Readable = require('stream').Readable

 

const readable = Readable({ objectMode: true })

 

readable.push('a')

readable.push('b')

readable.push({})

readable.push(null)

 

readable.on('data', data => console.log(data))

 

输出:

a

b

{}

可见,设置objectMode后,push(data)的数据被原样地输出了。此时,可以生产任意类型的数据。

结出两种设计模式,并基于Stream构建一个为Git仓库自动生成changelog的应用作为示例。

 

本文由职坐标整理发布,欢迎关注职坐标Node.js频道,学习更多WEB前端知识!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程