Flutter异步编程-Stream
❝Stream可以说是构成Dart响应式流编程重要组成部分。还记得之前文章中说过的Future吗,我们知道每个Future代表单一的值,可以异步传送数据或异常。而Stream的异步工作方式和Future类似,只是Stream代表的是一系列的事件,那么就可能传递任意数据值,可能是多个值也可以是异常。比如从磁盘中读取一个文件,那么这里返回的就是一个Stream。此外Stream是基于事件流订阅的机制来运转工作的。
❞
1. 为什么需要Stream
首先,在Dart单线程模型中,要实现异步就需要借助类似Stream、Future之类的API实现。所以「Stream可以很好地实现Dart的异步编程」。此外,在Dart中一些异步场景中,比如磁盘文件、数据库读取等类似需要读取一系列的数据时,这种场景Future是不太合适的,所以在一些需要实现一系列异步事件时Stream就是不错的选择,「Stream提供一系列异步的数据序列」。换个角度理解「Stream就是一系列的Future组合,Future只能有一个异步响应,而Stream就是一系列的异步响应」。
//Futures实现
void main() {
Future.delayed(Duration(seconds: 1), () => print('future value is: 1'));
}
输出结果:
//Stream实现
void main() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (int value) {
return value + 1;
});
await stream.forEach((element) => print('stream value is: $element'));
}
输出结果: (输出结果是一直在运行的)
2. 什么是Stream
用官方的术语来说: 「Stream 是一系列异步事件的序列。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事件时它会立即给你,但是 Stream 则不会立即给你而是在它准备好时告诉你。」 Streams是异步数据的源,Stream提供了一种接收事件序列的方式。每个事件要么是数据事件(或称为流的元素),要么就是用于通知异常信息error事件。当Stream所有的事件发出以后,一个"done"结束事件将作为最后一个事件发出。实际上类似RX响应式流的概念。
2.1 单一订阅模型(Single-subscription)
2.2 广播订阅模型(Broadcast-subscription)
2.3 模型分析
StreamController
是创建Stream
对象主要方式之一每个 StreamController
都会有一个槽口(Sink), 也就是Stream事件的入口,通过Sink的add
将事件序列加入到StreamController
中。StreamController
类似一个生产者和消费者模型,它不知道什么时候会有事件从Sink槽口加进来,而对于外部订阅者也不知道何时有事件出来,所以对于外部订阅者只需要添加监听就好了。当有事件通过sink槽口加入到StreamController后,StreamController就开始工作,然后直到它输出数据。 需要注意的是从sink槽口加入的事件序列是有序的,监听器得到序列是和加入序列一致,也就是说StreamController处理并不会打乱事件序列顺序。 单一订阅者顾明思意就是只能有一个订阅者监听整个事件流,而对于广播订阅可以有若干个订阅者监听整个事件流,类似于广播通知的机制。
3. 如何使用Stream
3.1 创建Stream的方法
3.1.1 通过Stream构造器创建
「Stream.fromFuture」: 通过Future创建一个新的 「single-subscription(单一订阅)Stream」 , 当Future完成时触发 then
回调,然后就会把返回的value加入到StreamController
中, 并且还会添加一个Done
事件表示结束。若Future完成时触发onError
回调,则会把error加入到StreamController
中, 并且还会添加一个Done
事件表示结束。
factory Stream.fromFuture(Future<T> future) {
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
future.then((value) {//future完成时,then回调
controller._add(value);//将value加入到_StreamController中
controller._closeUnchecked();//最后发送一个done事件
}, onError: (error, stackTrace) {//future完成时,error回调
controller._addError(error, stackTrace);//将error加入到_StreamController中
controller._closeUnchecked();//最后发送一个done事件
});
return controller.stream;//最后返回stream
}
void main() {
Stream.fromFuture(Future.delayed(Duration(seconds: 1), () => 100)).listen(
(event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);//cancelOnError: true(表示出现error就取消订阅,之后事件将无法接收;false表示出现error后,后面事件可以继续接收)
}
输出结果:
「Stream.fromIterable」: 通过从一个集合中获取其数据来创建一个新的**single-subscription(单一订阅)Stream**
void main() {
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8])
.map((event) => "this is $event")//还可以借助map,fold,reduce之类操作符,可以变换事件流
.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
输出结果:
「Stream.fromFutures」:从一系列的Future中创建一个新的 「single-subscription(单一订阅)Stream」,每个future都有自己的data或者error事件,当整个Futures完成后,流将会关闭。如果Futures为空,流将会立刻关闭。
void main() {
var future1 = Future.value(100);
var future2 = Future.delayed(Duration(seconds: 1), () => 200);
var future3 = Future.delayed(Duration(seconds: 2), () => 300);
Stream.fromFutures([future1, future2, future3])
.reduce((previous, element) => previous + element)//累加所有future中的值
.asStream()
.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
输出结果:
「Stream.periodic:」 可以创建一个新的重复发射事件而且可以指定间隔时间的Stream,通过再StreamController的onResume方法中创建一个Timer对象,最后调用Timer的 「periodic方法。」
factory Stream.periodic(Duration period,
[T computation(int computationCount)]) {
Timer timer;
int computationCount = 0;
StreamController<T> controller;
// Counts the time that the Stream was running (and not paused).
Stopwatch watch = new Stopwatch();//创建Stopwatch用于计算Stream运行时间, 会一直运行不会停止
void sendEvent() {
watch.reset();
T data;
if (computation != null) {
try {
data = computation(computationCount++);
} catch (e, s) {
controller.addError(e, s);
return;
}
}
controller.add(data);
}
void startPeriodicTimer() {
assert(timer == null);
//创建Timer对象
timer = new Timer.periodic(period, (Timer timer) {
sendEvent();
});
}
controller = new StreamController<T>(
sync: true,
onListen: () {
watch.start();
startPeriodicTimer();
},
onPause: () {
timer.cancel();
timer = null;
watch.stop();
},
onResume: () {
assert(timer == null);
Duration elapsed = watch.elapsed;
watch.start();
timer = new Timer(period - elapsed, () {
timer = null;
startPeriodicTimer();
sendEvent();
});
},
onCancel: () {
if (timer != null) timer.cancel();
timer = null;
return Future._nullFuture;
});
return controller.stream;
}
void main() {
Stream.periodic(Duration(seconds: 1), (value) => value + 100)
.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
输出结果:
3.1.2 通过StreamController创建
创建任意类型StremController,也就是sink槽口可以加入任何类型的事件数据
import 'dart:async';
void main() {
//1.创建一个任意类型StreamController对象
StreamController streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通过sink槽口添加任意类型事件数据
streamController.sink.add(100);
streamController.sink.add(100.121212);
streamController.sink.add('THIS IS STRING');
streamController.sink.close();//只有手动调用close方法发送一个done事件,onDone才会被回调
//3.注册监听
streamController.stream.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
输出结果:
创建指定类型的StreamController, 也就是sink槽口可以加入对应指定类型的事件数据
import 'dart:async';
void main() {
//1.创建一个int类型StreamController对象
StreamController<int> streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通过sink槽口添加int类型事件数据
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手动调用close方法发送一个done事件,onDone才会被回调
//3.注册监听
streamController.stream.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
输出结果:
3.1.3 通过async*创建
如果有一系列事件需要处理,也许会需要把它转化为 stream。这时候可以使用 「async」*** 和 yield** 来生成一个 Stream。
void main() {
generateStream(10).listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
Stream<int> generateStream(int dest) async* {
for (int i = 1; i <= dest; i++) {
yield i;
}
}
输出结果:
3.2 监听Stream
3.2.1 listen方法监听
监听Stream流主要就是使用 listen
这个方法,它有 onData(必填参数)
, onError(可选参数)
, onDone(可选参数)
, cancelOnError(可选参数)
onData: 接收到数据时触发回调 onError: 接收到异常时触发回调 onDone: 数据接收完毕触发回调 cancelOnError: 表示true(出现第一个error就取消订阅,之后事件将无法接收;false表示出现error后,后面事件可以继续接收)
StreamSubscription<T> listen(void onData(T data),
{Function onError, void onDone(), bool cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
3.2.2 async-await配合for或forEach循环处理
通过async-await配合for或forEach可以实现当Stream中每个事件到来的时候处理它,由于Stream接收事件时机是不确定,所以for或forEach循环退出的时候一般是Stream关闭或者完成结束的时候
void main() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (int value) {
return value + 1;
});
await stream.forEach((element) => print('stream value is: $element'));
}
输出结果:
3.3 Stream流的转换
Stream流的转换实际上是通过类似 map
、 take
、 where
、 reduce
、 expand
之类的操作符函数实现流的变换。实际上他们作用和集合中变化操作符意思是类似,所以这里由于篇幅问题就不一一展开,有了前面集合操作符函数基础,这里也是类似,只不过这边返回的是 Stream<T>
而已。不过这里需要特别说下 transform
操作符函数.
transform
操作符函数它能实现更多自定义的流变化规则, 它通过传入一个 StreamTransformer<T, S>
参数,最后返回一个 Stream<T>
. 也就是输入的流类型是 T
. 输出的是 S
,通过StreamTransformer
输出一个新的Stream流。
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
import 'dart:async';
void main() {
//1.创建一个int类型StreamController对象
StreamController<int> streamController = StreamController();
//2.通过sink槽口添加int类型事件数据
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手动调用close方法发送一个done事件,onDone才会被回调
//自定义StreamTransformer
final transformer = StreamTransformer<int, String>.fromHandlers(handleData: (value, sink) {
sink.add("this number is: $value");
});
//3.注册监听
streamController.stream
.transform(transformer)
.listen((event) => print("second listener: $event"), onDone: () => print('second listener: is done'));
}
输出结果:
3.4 Stream流的种类
其实这里Stream流的种类划分是基于Stream流的订阅模型来划分,所以那么这里Stream流的种类只有两种: 「Single-subscription(单一订阅)Stream、Broadcast-subscription(广播订阅)Stream」
3.4.1 Single-subscription(单一订阅)Stream
单一订阅Stream在整个流的生命周期中「只会有一个订阅者监听,「也就是listen方法只能调用一次,而且」第一次listen取消(cancel)后,不能重复监听,否则会抛出异常」。
import 'dart:async';
void main() {
//1.创建一个int类型StreamController对象
StreamController<int> streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通过sink槽口添加int类型事件数据
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手动调用close方法发送一个done事件,onDone才会被回调
//3.注册监听
streamController.stream.listen((event) => print(event), onDone: () => print('is done'));
streamController.stream.listen((event) => print(event), onDone: () => print('is done')); //不允许两次监听
}
输出结果:
3.4.2 「Broadcast-subscription」(「广播订阅」)Stream
Broadcast广播订阅模型Stream, 可以同时存在任意多个订阅者监听,无论是否有订阅者,它都会产生事件。所以中途进来的收听者将「不会收到」之前的消息。如果多个收听者要监听单一订阅Stream,需要使用 asBroadcastStream
转化成Broadcast广播订阅Stream. 或者创建BroadcastStream流可以通过继承Stream然后重写isBroadcast为true即可。
import 'dart:async';
void main() {
//1.创建一个int类型StreamController对象
StreamController<int> streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通过sink槽口添加int类型事件数据
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手动调用close方法发送一个done事件,onDone才会被回调
//3.注册监听
Stream stream = streamController.stream.asBroadcastStream();//转换成BroadcastStream
stream.listen((event) => print("first listener: $event"), onDone: () => print('first listener: is done'));
stream.listen((event) => print("second listener: $event"), onDone: () => print('second listener: is done'));
}
输出结果:
4. Stream使用的场景
Stream的使用场景有很多比如数据库的读写、文件IO的读写、基于多个网络请求转化处理都可以使用流来处理。下面会给出一个具体的文件复制例子实现IO文件读写使用Stream的场景。
import 'dart:async';
import 'dart:io';
void main() {
copyFile(File('/Users/mikyou/Desktop/gitchat/test.zip'),
File('/Users/mikyou/Desktop/gitchat/copy_dir/test.copy.zip'));
}
void copyFile(File sourceFile, File targetFile) async {
assert(await sourceFile.exists() == true);
print('source file path: ${sourceFile.path}');
print('target file path: ${targetFile.path}');
//以WRITE方式打开文件,创建缓存IOSink
IOSink sink = targetFile.openWrite();
//文件大小
int fileLength = await sourceFile.length();
//已读取文件大小
int count = 0;
//模拟进度条
String progress = "-";
//以只读方式打开源文件数据流
Stream<List<int>> inputStream = sourceFile.openRead();
inputStream.listen((List<int> data) {
count += data.length;
//进度百分比
double num = (count * 100) / fileLength;
print("${progress * (num ~/ 2)}[${num.toStringAsFixed(2)}%]");
//将数据添加到缓存sink中
sink.add(data);
}, onDone: () {
//数据流传输结束时,触发onDone事件
print("复制文件结束!");
//关闭缓存释放系统资源
sink.close();
});
}
输出结果:
5. Stream与Future的区别
实际上有了上面对Stream的介绍,相信很多人基本上都能分析出Stream和Future的区别了。先用官方专业术语做下对比:「Future 表示一个不会立即完成的计算过程」。与普通函数直接返回结果不同的是异步函数返回一个将会包含结果的 Future。该 Future 会在结果准备好时通知调用者「Stream 是一系列异步事件的序列」。其类似于一个异步的 Iterable,不同的是当你向 Iterable 获取下一个事件时它会立即给你,但是 Stream 则不会立即给你而是在它准备好时告诉你可以使用一个餐厅吃饭场景来理解Future和Stream的区别:
「Future」就好比你去一家餐厅吃饭,在前台点好你想吃的菜后,付完钱后服务员会给你一个等待的号码牌(相当于先拿到一个Future),后厨就开始根据你下的订单开始做菜,等到你的菜好了后,就可以通过号码牌拿到指定的菜了(返回的数据或异常信息)。「Stream」就好比去一家餐厅吃饭,在前台点好A,B,C,D4种你想吃的菜后(订阅数据流过程),然后你就去桌子等着,至于菜什么时候好,你也不知道所以就一直等着(类似于一直监听listen着),后厨就开始根据你下的订单开始做菜, 等着你的第一盘A种菜好了后,服务员就会主动传送A到你的桌子上(基于一种类似订阅-推送机制),没有特殊意外,服务员推送菜的顺序应该也是A,B,C,D。
6. 熊喵先生的小总结
到这里有关Dart异步编程中Stream就介绍完毕了,Stream在数据库读写,文件IO读写方面是非常实用。此外它支持响应式流式编程,可以利用它一些转换操作符实现对流的变换,可以实现一些类似Rx中的操作。
感谢关注,熊喵先生愿和你在技术路上一起成长!