Node.js Stream 介绍

微信扫一扫,分享到朋友圈

Node.js Stream 介绍

We should have some ways of connecting programs like garden hose–screw in another segment when it becomes when it becomes necessary to massage data in another way. This is the way of IO also.



Doug McIlroy. October 11, 1964

为什么应该使用流

Node.js中,I/O都是异步的,所以在和硬盘以及网络的交互过程中会涉及到传递回调函数的过程。你之前可能会写出这样的代码:

var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(3000);

上面的这段代码并没有什么问题,但是在每次请求时,我们都会把整个data.txt文件读入到内存中,然后再把结果返回给客户端。想想看,如果data.txt文件非常大,在响应大量用户的并发请求时,程序可能会消耗大量的内存,这样很可能会造成用户连接缓慢的问题。

其次,上面的代码可能会造成很不好的用户体验,因为用户在接收到任何的内容之前首先需要等待程序将文件内容完全读入到内存中。

所幸的是,(req,res)参数都是流对象,这意味着我们可以使用一种更好的方法来实现上面的需求:

var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(3000);

在这里, .pipe()
方法会自动帮助我们监听data和end事件。上面的这段代码不仅简洁,而且data.txt文件中每一小段数据都将源源不断的发送到客户端。

除此之外,使用.pipe()方法还有别的好处,比如说它可以自动控制后端压力( back pressure
),以便在客户端连接缓慢的时候node可以将尽可能少的缓存放到内存中。

想要将数据进行压缩?我们可以使用相应的流模块完成这项工作!

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(3000);

通过上面的代码,我们成功的将发送到浏览器端的数据进行了gzip压缩。我们只是使用了一个oppressor模块来处理这件事情。

一旦你学会使用流API,你可以将这些流模块像搭乐高积木或者像连接水管一样拼凑起来,从此以后你可能再也不会去使用那些没有流API的模块获取和推送数据了。

为了方便理解,不使用流和使用流两种场景可通过下图直观感受:

不使用流
使用流

Stream的类型

在Node.js中,有五种类型的流:

  • Readable – 可读取数据的流(例如 fs.createReadStream())
  • Writable – 可写入数据的流(例如 fs.createWriteStream())
  • Duplex – 可读又可写的流(例如 net.Socket)
  • Transform – 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())
  • Classic – 无论何时,只要一个流对象注册了一个data监听器,它就会自动的切换到classic模式,并且根据旧API的方式运行。

Readable Streams

可读流可以产出数据,通过 .pipe 接口可以将其数据传送到一个 writable、transform 或者 duplex流中。

常见的可读流包括:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdout and stderr
  • process.stdin

创建一个Readable streams

var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('abc ');
rs.push('123\n');
rs.push(null);
rs.pipe(process.stdout);
// 运行结果
// abc 123

在上面的代码中rs.push(null)的作用是告诉可读流输出的数据结束了。

我们可以通过定义一个_read()函数来实现按需推送数据:

var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
// 运行结果如下:
// abcdefghijklmnopqrstuvwxyz

上面代码中我们将字母a-z推进了rs中,但是只有当数据消耗者出现时,数据才会真正实现推送。_read函数有一个可选参数size来指明消耗者想要读取多少比特的数据。

为了说明只有在数据消耗者出现时,_read函数才会被调用,我们可以将上面的代码简单的修改一下:

var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

运行上面的代码,我们可以发现如果我们只请求5比特的数据,那么_read只会运行5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

消耗一个Readable streams

大部分时候,将一个可读流直接pipe到另一种类型的流(writable、transform 或者 duplex)或者使用through或者concat-stream创建的流中,是一件很容易的事情。但有时我们也会需要直接消耗一个可读流。

// consume0.js
process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});

上面代码运行结果如下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
Buffer(4) [Uint8Array] [ 97, 98, 99, 10 ]
Buffer(4) [Uint8Array] [ 100, 101, 102, 10 ]
Buffer(4) [Uint8Array] [ 103, 104, 105, 10 ]
null

当数据可用时,readable事件将会被触发,此时你可以调用.read()方法来从缓存中获取这些数据。当流结束时,.read()将返回null,因为此时已经没有更多的字节可以供我们获取了。

你也可以告诉 .read()
方法来返回n个字节的数据:

// consume1.js
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});

上面代码运行结果如下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
Buffer(3) [Uint8Array] [ 97, 98, 99 ]
Buffer(3) [Uint8Array] [ 10, 100, 101 ]
Buffer(3) [Uint8Array] [ 102, 10, 103 ]
Buffer(3) [Uint8Array] [ 104, 105, 10 ]
// 现在每次打印三个字节的数据,直到把缓存中的数据全部打印完毕。

最后再举一个消费可读流的例子:

const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(iterator) {
super();
}
_read() {
this.push('a');
}
}
const readable = new MyReadable();
// 通过事件监听消费可读流
readable.on('data', data => process.stdout.write(data));
readable.on('end', () => process.stdout.write('DONE'));
// 或者通过pipe()替代
// readable.pipe(process.stdout);

上面代码执行结果将持续不断的输出字符 a
到终端。

  • _read方法从底层系统读取具体数据
  • _read方法中,通过调用push(data)将数据放入可读流中供下游消耗。
  • _read方法中,可以同步调用push(data),也可以异步调用。
  • _read方法中,通过调用push(null)来结束可读流。
  • 可读流一旦结束,便不能再调用push(data)添加数据。

Writable Streams

可写流只能流进不能流出,常见的可写流包括:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

创建一个Writable streams

只需要定义一个 _write(chunk, enc, next)
方法,就可以将一个readable流的数据释放到其中。

// write0.js
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);

上面代码运行结果:

$ (echo abc; sleep 1; echo 123) | node write0.js
Buffer(4) [Uint8Array] [ 97, 98, 99, 10 ]
Buffer(4) [Uint8Array] [ 49, 50, 51, 10 ]

在从一个readable流向一个writable流传数据的过程中,数据会自动被转换为Buffer对象,除非你在创建writable流的时候指定decodeStrings参数为false,Writable({decodeStrings: false})。

如果你需要传递对象,需要指定objectMode参数为true,Writable({ objectMode: true })。

向可写流中写数据

向一个writable流中写入数据,只需要调用 write(data)
即可。

writableStream.write('abc 123');

为了告诉一个writable流你已经写完毕了,只需要调用.end()方法。也可以使用.end(data)在结束前再写一些数据。

// write1.js
var fs = require('fs');
var ws = fs.createWriteStream('data.txt');
ws.write('abc ');
setTimeout(function () {
ws.end('123\n');
}, 1000);

如果你需要调整内部缓冲区大小,那么需要在创建可写流对象时设置highWaterMark。在调用.write()方法返回false时,说明写入的数据大小超过了该值。

为了避免读写速率不匹配而造成内存上涨,可以监听drain事件,等待可写流内部缓存被清空再继续写入。

一个可写流的完整示例

const Writable = require('stream').Writable;
const writable = Writable();
writable._write = function (data, enc, next) {
process.stdout.write(data.toString().toUpperCase());
process.nextTick(next);
}
writable.on('finish', () => process.stdout.write('DONE'));
writable.write('a' + '\n');
writable.write('b' + '\n');
writable.write('c' + '\n');
writable.end();
writable.write(data)

Duplex Streams

双工流(Duplex)是同时实现了 Readable 和 Writable 接口的流,常见的双工流有:

  • TCP sockets
  • zlib streams
  • crypto streams

创建一个Duplex Streams

var Duplex = require('stream').Duplex;
var duplex = Duplex();
duplex._read = function () {
var date = new Date();
this.push( date.getFullYear().toString() );
this.push(null)
};
duplex._write = function (buf, enc, next) {
console.log( buf.toString() );
next()
};
duplex.on('data', data => console.log( data.toString() ));
duplex.write('the year is');
duplex.end();
// 或者使用pipe
// duplex.pipe(duplex);
// 执行结果
// the year is
// 2020
  • 上面代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
  • 同时代码又实现了_write方法,可作为下游去消耗数据。

Transform Streams

转换流(Transform)是一种 Duplex 流,但它的输出与输入是相关联的。 与 Duplex 流一样, Transform 流也同时实现了 Readable 和 Writable 接口。常见的转换流有:

  • zlib streams
  • crypto streams

创建一个转换流

双工流中的可读端和可写端的数据是隔离的,在转换流中可写端写入的数据经变换后会自动添加到可读端。Tranform继承自Duplex,并已经实现了_read和_write方法,同时要求用户实现一个_transform方法。

// transform.js
const Transform = require('stream').Transform;
const transform = Transform();
transform._transform = function(buf, enc, next) {
var res = buf.toString().toUpperCase();
this.push(res);
next();
}
transform.on('data', data => process.stdout.write(data));
transform.write('hello, ');
transform.write('world!');
transform.end();
// 执行结果
// HELLO,WORLD!

Classic Streams

Classic Streams是一个古老的接口,最早出现在Node 0.4中。上文提及的“Stream.Readable”接口均是从 Node 0.9.4 开始才有,因此早期往往需要对其进行封装扩展才能更好地用来开发。

只要一个流对象注册了一个data监听器,它就会自动的切换到classic模式,并且根据旧API的方式运行。

Classic Readable Streams

Classic Streams可以当作一个带有 .pipe 接口的事件发射器(event emitter),当它要为消耗者提供数据时会发射“data”事件,当要结束生产数据时,则发射“end”事件。

只有当设置 stream.readable 为 true 时,.pipe 接口才会将当前流视作readable流。

// classic0.js
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
} else {
stream.emit('data', String.fromCharCode(c));
}
}, 100);
stream.pipe(process.stdout);
// 去掉命令行尾部的 %
process.on('exit', function () {
console.log();
});
// 执行结果输出A到J字符。

为了从一个Classic Readable流中读取数据,你可以注册data和end监听器。下面是一个使用旧readable流方式从process.stdin中读取数据的例子:

process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});

上面代码运行结果:

$ (echo abc; sleep 1; echo 123) | node classic1.js
<Buffer 61 62 63 0a>
<Buffer 31 32 33 0a>
__END__

Classic Readable流拥有两个可选的.pause()和.resume()方法来暂停一个流,如果你想要使用.pause()和.resume()方法,你应该使用through模块来帮助你处理缓存。

Classic Writable Streams

Classic Writable流需要定义.write(buf),.end(buf),以及.desctory()方法。

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
stream.writable = true;
// 接收上游的数据
stream.write = function(data) {
console.log(data);
return true;
}
stream.end = function (data) {
console.log(data, '__END__');
}
stream.desctory = function () {
console.log('desctory')
}
//stream.emit('end', '__END__');

ObjectMode

对于可读流来说,push(data) 时,data 的类型只能是 String、Buffer或Uint8Array,且消耗时 data 事件输出的数据类型都为 Buffer;

对于可写流来说,write(data) 时,data 的类型同样是 String、Buffer或Uint8Array,_write(data) 调用时所传进来的 data 类型都为 Buffer。

也就是说,Node.js创建的流中的数据默认情况下都是Buffer类型。产生的数据一放入流中,便转成Buffer被消耗;写入的数据在传给底层写逻辑时,也被转成Buffer类型。当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null)。 这些流会以“对象模式”进行操作。

Readable Streams未设置objectMode时:

const Readable = require('stream').Readable;
const readable = Readable();
readable.push('a');
readable.push('b');
readable.push(null);
readable.on('data', data => console.log(data));
// 输出:
// <Buffer 61>
// <Buffer 62>

Readable Streams设置objectMode时:

const Readable = require('stream').Readable;
const readable = Readable({ objectMode: true });
readable.push('a');
readable.push('b');
readable.push({});
readable.push(null);
readable.on('data', data => console.log(data));
// 输出:
// a
// b
// {}

pipe

管道用来连接上下游,它接受一个源头readable并将数据输出到一个可写的流writable中:

readable.pipe(writable)

writable是一个可写流Writable对象,上游调用其write方法将数据写入其中。 writable内部维护了一个写队列,当这个队列长度达到某个阈值(state.highWaterMark)时, 执行write()时返回false,否则返回true。当可以继续写入数据到流时会触发 ‘drain’ 事件。

于是上游可以根据write()的返回值在流动模式和暂停模式间切换:

readable.on('data', function (data) {
if (false === writable.write(data)) {
readable.pause()
}
})
writable.on('drain', function () {
readable.resume()
})

以上便是pipe方法的核心逻辑。

当write()返回false时,调用readable.pause()使上游进入暂停模式,不再触发data事件。 但是当writable将缓存清空时,会触发一个drain事件,再调用readable.resume()使上游进入流动模式,继续触发data事件。

const stream = require('stream');
var c = 0;
const readable = stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(() => {
var data = c < 6 ? String.fromCharCode(c + 65) : null
console.log('push', ++c, data)
this.push(data)
})
}
});
const writable = stream.Writable({
highWaterMark: 2,
write: function (chunk, enc, next) {
console.log('write', chunk)
}
});
readable.pipe(writable);
// 输出
// push 1 A
// write <Buffer 41>
// push 2 B
// push 3 C
// push 4 D

虽然上游一共有6个数据(ABCDEF)可以生产,但实际只生产了4个(ABCD)。 这是因为第一个数据(A)迟迟未能写完(未调用next()),所以后面通过write方法添加进来的数据便被缓存起来。 下游的缓存队列到达2时,write返回false,上游切换至暂停模式。 此时下游保存了AB。 由于Readable总是缓存state.highWaterMark这么多的数据,所以上游保存了CD。 从而一共生产出来ABCD四个数据。

为了更直观理解上面的代码,将上代码改造如下:

const { Readable, Writable } = require('stream');
const readable = Readable({
highWaterMark: 2
});
const writable = Writable({
highWaterMark: 3
});
readable._read = function () {
// 源源不断的生成数据 'a'
console.log('readable._read:', 'a');
this.push('a');
};
writable._write = function (chunk, enc, next) {
console.log('WRITABLE._WRITE:', chunk.toString());
setTimeout(() => {
next();
}, 1000)
}
// // 监听`data`事件,一次获取一个数据
readable.on('data', data => {
console.log('writable.highWaterMark:', data.toString());
if (false === writable.write(data)) {
readable.pause()
}
});
writable.on('drain', function () {
console.log('drain');
readable.resume();
});

上面代码执行结果:

readable._read: a
readable._read: a
writable.highWaterMark: a
WRITABLE._WRITE: a
readable._read: a
writable.highWaterMark: a
readable._read: a
writable.highWaterMark: a
readable._read: a
// 步骤一:
// readable._read: a 输出了5次,也就是把readable和writable分别定义的highWaterMark,2+3 的缓存空间写满了。
// writable.highWaterMark: a 输出了3次,表示writable的缓存空间3写满,此时writable.write(data)返回false,readable切换为暂停模式。
// WRITABLE._WRITE: a 输出了1次,表示第一个数据输出后,还未调用next()。
// 步骤三:
// 等待1秒后, 输出如下:
WRITABLE._WRITE: a
// 再等一秒后,输出如下:
WRITABLE._WRITE: a
// 此时writable的缓存空间3被清空,触发'drain'事件,readable切换为流动模式。输出:drain 同时重复步骤一。

以上便是pipe()方法的实现原理。

参考

微信扫一扫,分享到朋友圈

Node.js Stream 介绍

2020年11月编程语言排行:C、Python、Java

上一篇

3C认证曝光:华为Mate X2折叠屏新机支持66W快充

下一篇

你也可能喜欢

Node.js Stream 介绍

长按储存图像,分享给朋友