Node.js中的Stream
输入流(或可读流 readable stream),就是流中有数据,我们从里面读取数据。输入流负责读取数据,我们只需要从输入流中读取数据。
输出流:它负责向目的地写入数据,而我们只需要向输出流中写入数据。
双向流: 即可以从它里面读取数据,也可以向它里面写入数据
转换流,给它一个流,它把流里面的内容转换一下,然后再把流输出,流的性质是不变的,流的内容发生了变化,通常转化的是输入流
输入流和输出流是相对于计算机内存而言的,输入流,就是把数据读取到内存中,输出流,则是把内存中的数据写入到目的地。
我们既可以使用Node.js提供的流,比如,fs.createReadStream 就创建了读取文件的输入流,也可以创建自己的流。通常来说,都是先使用Node.js提供的流,那就使用fs模块提供的输入,输出流。
fs.createReadStream() 创建输入流,输入流负责读取数据,所以它接受一个必须的参数,要读取的数据。
fs.createReadStream('./data.txt') 就是从当前文件下的data.txt中读取数据。我们程序需要做的就是从输入流中读取想要的数据,在Node.js中从输入流中读取数据,也有多种方式。
这要从输入流的两种模式说起。输入流有两种模式,pause模式,flow模式,pause模式是默认模式,就是创建输入流后,它处于暂停状态,程序不会从输入流中读取数据,我们需要手动地从输入流中读取数据。
flow模式,则是程序自动地从输入流中读取数据,我们只需要决定读取到数据后再怎么处理数据就可以了。
这时,flow模式,可能是我们想要的。
怎么从pause模式切换到flow模式呢?给输入流注册data事件,流就自动转化成flow模式,那再data事件注册一个事件处理函数,处理数据,就可以实现从输入流中读取数据了。
创建一个文件夹,新建一个data.txt文件和read.js 文件,data.txt文件写一些文字。read.js文件如下:
const fs = require('fs');
const readable = fs.createReadStream('./data.txt');
readable.on('data', data => {
console.log(data);
})
node read.js 执行程序,控制台输出了buffer, 就是一些二进制数据,默认情况下,从输入流中读取到的内容都是buffer,我们需要手动转化成字符串,调用toString()方法就可以了。console.log(data.toString()) 。
假设想用pause模式来读取数据,那就要手动调用输入流的read() 方法了,但还要注意,只有在输入流中有数据的时候,才能调用read() 方法,所以要在readable 事件处理函数中调用read()事件, read()方法,如果读取不到数据,就会返回null
const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
readableStream.on('readable', () => {
let chunk;
while (null !== (chunk = readableStream.read())) {
console.log(chunk.toString());
}
})
除了使用事件的方式来从输入流中读取数据,还可以使用异步迭代器(for await ... of )来消费输入流(从输入流中读取数据),因为输入流是异步可迭代对象
const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk.toString());
}
}
logChunks(readableStream);
当然它的底层还是监听readable 事件。除了在异步迭代器,直接处理数据,也可以把流中的数据暂时存储起来,以便日后消费。
const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
readableStream.setEncoding('utf-8');
async function readableToString(readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
}
readableToString(readableStream).then(console.log);
如果要处理异常,可以用try/catch 把for await 的处理包起来。说完了输入流,再说输出流。
fs.createWriteStream 创建一个输出流,输出流,就是把数据输出到什么地方,因此,它也接受一个参数,就是输出的目的地。我们要做的就是向输出流中写放数据,要调用write() 方法。
const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
const writeStream = fs.createWriteStream('./result.txt');
readableStream.on('data', data => {
writeStream.write(data);
})
write() 返回true or false, true表示写入成功,你可以继续写入数据。false则是,写入出错了,你不能继续写入了。至于什么时候能再写,输出流触发drain事件。
只要输出流触发了drain事件,就证明,可以继续向输出流中写入数据了。所以真正安全的做法,向输出流写入数据的时候,还要判断true or false, 并监听 drain 事件。
const fs = require('fs');
const util = require('util');
const stream = require('stream');
const {once} = require('events');
const readableStream = fs.createReadStream('./data.txt');
const writeStream = fs.createWriteStream('./result.txt');
const finished = util.promisify(stream.finished);
async function writeIterableToFile(readable, writable) {
for await (const chunk of readable) {
if (!writable.write(chunk)) {
await once(writable, 'drain');
}
}
writable.end();
// Wait until done. Throws if there are errors.
await finished(writable);
}
writeIterableToFile(readableStream, writeStream)
end() 方法,就表示,向输出流中写完数据了,不会再写了。finished 事件,则中输出流,把所有的数据都写入到的目的地中。当我们手动去读取和写入文件时候,处理有点麻烦,这就用到pipe()方法。上面的可以写成。
const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
const writeStream = fs.createWriteStream('./result.txt');
readableStream.pipe(writeStream);
pipe() 操作,前一个的输出变成后面一个的输入。readableStream输入流的输出,就是读取到的数据,我们就是要把这些数据写入到输出流中,所以它正好是输出流的输入,因此,就可以用pipe把这两个链接起来。pipe()的操作就相当于
readableStream.on("data", chunk => {
// 自动处理了drain事件。
writeStream.write(chunk);
});
readableStream.on("end", () => {
writeStream.end();
});
pipe() 也是把输入流的模式转换成了flow模式。
现在可以创建自已的输入,输出流了。还是有多种方法。先看输入流的创建,输入流中永远都是存储数据,要不然,我们没有办法从其里面读取数据。
1,直接创建一个Stream.Reable()对象,然后向里面push 数据。push(null) 表示不再向输入流中push数据。
const Stream = require('stream');
const readableStream = new Stream.Readable();
readableStream.push('ping');
readableStream.push('pong!');
readableStream.push(null);
当向输入流中push 数据的时候,它都是放到内存的buffer中,如果没有消费掉这些数据,内存就会占满,push不进去了。
2, Readable.from() 从一个可迭代对象中创建一个输入流。
async function* gen() {
yield 'hello';
yield 'stream';
}
const readableFromIterator = Stream.Readable.from(gen());
3,实现 _read()方法。_read() 方法,就是表示,把要读取的数据放到输入流中,不要和前面的read()搞混了,read() 是从输入流读取数据到程序中处理。
_read() 则是把要读取的数据放入到输入流中,这样我们才能调用read()方法来从里面消费数据. _read() 是node.js 自己调用的,也可以看到输入流,其实是要我们要读取的数据的一个抽象。
我们要读取哪一个文件的数据,就创建哪一个文件的输入流,可以把输入流想像读取的源文件。
怎么把数据放入到输入流中,还是调用push方法。在_read()方法中调用push()方法。
const stream = require('stream');
const data = require('./result'); // json 数据
class JsonDataReadable extends stream.Readable {
readIndex = 0;
_read(size) { // 读取的默认大小
let okToSend = true;
while (okToSend) {
okToSend = this.push(data.text.substr(this.readIndex, size));
this.readIndex += size;
if (this.readIndex >data.text.length) {
this.push(null);
okToSend = false;
}
}
}
}
{
"text": "The return value of the pipe() method is the destination stream, which is a very convenient thing that lets us chain multiple pipe() calls, like this:"
}
然后在这个js 文件写。
const fs = require('fs');
const Reader = new JsonDataReadable();
const fileWriter = fs.createWriteStream('output.txt');
Reader.pipe(fileWriter);
创建输出流: 是向目的地写入数据,当使用目的地创建一个输出流的时候,它就会自动地向目的地写数据,写数据调用的是_write() 方法。
只要输出流中有数据,它才会向目的地写数据,要把内存中的数据写入到输出流中,那就调用write()方法。
write()方法,是将内存中的数据写入到输出流中。
应该是存储到buffer中,流中的数据写入到目的地,就是把buffer中的数据输出到目的地就中_write()方法。_write()方法,是内部的接口,表示怎么把数据写入到目的地。显示_write()方法。
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString()); // 写到控制台上
next(); // 表示写入成功
}
writableStream.write('hello, ');
writableStream.end('world!');