再谈协程之第三者Flow基础档案
点击上方蓝字关注我,知识会给你力量
该来的还是来了,LiveData提供了响应式编程的基础,搭建了一套数据观察者的使用框架,但是,它相当于RxJava这类的异步框架来说,有点略显单薄了,这也是经常被人诟病的问题,因此,Flow这个小三就顺应而生了。
Flow作为一套异步数据流框架,几乎可以约等于RxJava,但借助Kotlin语法糖和协程,以及Kotlin的DSL语法,可以让Flow的写法变得异常简洁,让你直面人性最善良的地方,一切的黑暗和丑陋,都被编译器消化了。而且,Flow作为LiveData的进化版本,可以很好的和JetPack结合起来,作为全家桶的一员,为统一架构添砖加瓦。
要理解FLow,首先需要了解Flow的各种操作符和基础功能,如果不理解这些,那么很难将Flow灵活运用,所以,本节主要来梳理Flow的基础。
Flow前言
首先,我们来看一个新的概念——冷流和热流,如果你看网上的Flow相关的文章,十有八九都会提到这个很冷门的名词。
❝Flow是早上冷的,到Channel才热起来。
❞
一个异步数据流,通常包含三部分:
上游 操作符 下游
所谓冷流,即下游无消费行为时,上游不会产生数据,只有下游开始消费,上游才从开始产生数据。
而所谓热流,即无论下游是否有消费行为,上游都会自己产生数据。
Flow操作符
Flow和RxJava一样,用各种操作符撑起了异步数据流框架的半边天。Flow默认为冷流,即下游有消费时,才执行生产操作。
所以,操作符也被分为两类——中间操作符和末端操作符,中间操作符不会产生消费行为,返回依然为Flow,而末端操作符,会产生消费行为,即触发流的生产。
Flow的创建
仅仅创建Flow,是不会执行Flow中的任何代码的,但我们首先,还是要看下如何创建Flow。
flow
通过flow{}构造器,可以快速创建Flow,在flow中,可以使用emit来生产数据(或者emitAll生产批量数据),示例如下。
flow {
for (i in 0..3) {
emit(i.toString())
}
}
flowOf
与listOf类似,Flow可以通过flowOf来产生有限的已知数据。
flowOf(1, 2, 3)
asFlow
asFlow用于将List转换为Flow。
listOf(1,2,3).asFlow()
emptyFlow
如题,创建一个空流。
末端操作符
末端操作符在调用之后,创建Flow的代码才会执行,这点和Sequence非常类似。
collect
collect是最常用的末端操作符,示例如下。
❝末端操作符都是suspend函数,所以需要运行在协程作用域中。
❞
MainScope().launch {
val time = measureTimeMillis {
flow {
for (i in 0..3) {
Log.d("xys", "emit value---$i")
emit(i.toString())
}
}.collect {
Log.d("xys", "Result---$it")
}
}
Log.d("xys", "Time---$time")
}
collectIndexed
带下标的collect,下标是Flow中的emit顺序。
MainScope().launch {
val time = measureTimeMillis {
flow {
for (i in 0..3) {
Log.d("xys", "emit value---$i")
emit(i.toString())
}
}.collectIndexed { index, value ->
Log.d("xys", "Result in $index --- $value")
}
}
Log.d("xys", "Time---$time")
}
collectLatest
collectLatest用于在collect中取消未来得及处理的数据,只保留当前最新的生产数据。
flowOf(1, 2, 3).collectLatest {
delay(1)
Log.d("xys", "Result---$it")
}
toCollection、toSet、toList
这些操作符用于将Flow转换为Collection、Set和List。
launchIn
在指定的协程作用域中直接执行Flow。
flow {
for (i in 0..3) {
Log.d("xys", "emit value---$i")
emit(i.toString())
}
}.launchIn(MainScope())
last、lastOrNull、first、firstOrNull
返回Flow的最后一个值(第一个值),区别是last为空的话,last会抛出异常,而lastOrNull可空。
flow {
for (i in 0..3) {
emit(i.toString())
}
}.last()
状态操作符
状态操作符不做任何修改,只是在合适的节点返回状态。
onStart:在上游生产数据前调用 onCompletion:在流完成或者取消时调用 onEach:在上游每次emit前调用 onEmpty:流中未产生任何数据时调用 catch:对上游中的异常进行捕获 retry、retryWhen:在发生异常时进行重试,retryWhen中可以拿到异常和当前重试的次数
MainScope().launch {
Log.d("xys", "Coroutine in ${Thread.currentThread().name}")
val time = measureTimeMillis {
flow {
for (i in 0..3) {
emit(i.toString())
}
throw Exception("Test")
}.retryWhen { _, retryCount ->
retryCount <= 3
}.onStart {
Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
Log.d("xys", "emit value---$it")
}.onCompletion {
Log.d("xys", "Flow Complete")
}.catch { error ->
Log.d("xys", "Flow Error $error")
}.collect {
Log.d("xys", "Result---$it")
}
}
Log.d("xys", "Time---$time")
}
另外,onCompletion也可以监听异常,代码如下所示。
.onCompletion { exception ->
Log.d("xys", "Result---$exception")
}
Transform操作符
与RxJava一样,在数据流中,我们可以利用操作符对数据进行各种变换,以满足操作流的不同需求。
map、mapLatest、mapNotNull
map操作符将Flow的输入通过block转换为新的输出。
flow {
for (i in 0..3) {
emit(i)
}
}.map {
it * it
}
transform、transformLatest
transform操作符与map操作符有点一样,但又不完全一样,map是一对一的变换,而transform则可以完全控制流的数据,进行过滤、 重组等等操作都可以。
flow {
for (i in 0..3) {
emit(i)
}
}.transform { value ->
if (value == 1) {
emit("!!!$value!!!")
}
}.collect {
Log.d("xys", "Result---$it")
}
transformWhile
transformWhile的返回值是一个bool类型,用来控制流的截断,如果返回true,则流继续执行,如果false,则流截断。
flow {
for (i in 0..3) {
emit(i)
}
}.transformWhile { value ->
emit(value)
value == 1
}.collect {
Log.d("xys", "Result---$it")
}
过滤操作符
如题,过滤操作符用于过滤流中的数据。
filter、filterInstance、filterNot、filterNotNull
过滤操作符可以按条件、类型或者对过滤取反、取非空等条件进行操作。
flow {
for (i in 0..3) {
emit(i)
}
}.filter { value ->
value == 1
}.collect {
Log.d("xys", "Result---$it")
}
drop、dropWhile、take、takeWhile
这类操作符可以丢弃前n个数据,或者是只拿前n个数据。带while后缀的,则表示按条件进行判断。
debounce
debounce操作符用于防抖,指定时间内的值只接收最新的一个。
sample
sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。
distinctUntilChangedBy
去重操作符,可以按照指定类型的参数进行去重。
组合操作符
组合操作符用于将多个Flow的数据进行组合。
combine、combineTransform
combine操作符可以连接两个不同的Flow。
val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
flow1.combine(flow2) { i, s -> i.toString() + s }.collect {
Log.d("xys", "Flow combine: $it")
}
输出为:
D/xys: Flow combine: 1a
D/xys: Flow combine: 2a
D/xys: Flow combine: 2b
D/xys: Flow combine: 2c
可以发现,当两个Flow数量不同时,始终由Flow1开始,用其最新的元素,与Flow2的最新的元素进行组合,形成新的元素。
merge
merge操作符用于将多个流合并。
val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
listOf(flow1, flow2).merge().collect {
Log.d("xys", "Flow merge: $it")
}
输出为:
D/xys: Flow merge: 1
D/xys: Flow merge: 2
D/xys: Flow merge: a
D/xys: Flow merge: b
D/xys: Flow merge: c
merge的输出结果是按照时间顺序,将多个流依次发射出来。
zip
zip操作符会分别从两个流中取值,当一个流中的数据取完,zip过程就完成了。
val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
flow1.zip(flow2) { i, s -> i.toString() + s }.collect {
Log.d("xys", "Flow zip: $it")
}
输出为:
D/xys: Flow zip: 1a
D/xys: Flow zip: 2b
线程切换
在Flow中,可以简单的使用flowOn来指定线程的切换,flowOn会对上游,以及flowOn之前的所有操作符生效。
flow {
for (i in 0..3) {
Log.d("xys", "Emit Flow in ${Thread.currentThread().name}")
emit(i)
}
}.map {
Log.d("xys", "Map Flow in ${Thread.currentThread().name}")
it * it
}.flowOn(Dispatchers.IO).collect {
Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
Log.d("xys", "Result---$it")
}
这种情况下,flow和map的操作都将在子线程中执行。
而如果是这样:
flow {
for (i in 0..3) {
Log.d("xys", "Emit Flow in ${Thread.currentThread().name}")
emit(i)
}
}.flowOn(Dispatchers.IO).map {
Log.d("xys", "Map Flow in ${Thread.currentThread().name}")
it * it
}.collect {
Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
Log.d("xys", "Result---$it")
}
这样map就会执行在主线程了。
同时,你也可以多次调用flowOn来不断的切换线程,让前面的操作符执行在不同的线程中。
取消Flow
Flow也是可以被取消的,最常用的方式就是通过withTimeoutOrNull来取消,代码如下所示。
MainScope().launch {
withTimeoutOrNull(2500) {
flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}.collect {
Log.d("xys", "Flow: $it")
}
}
}
这样当输出1、2之后,Flow就被取消了。
❝Flow的取消,实际上就是依赖于协程的取消。
❞
Flow的同步非阻塞模型
首先,我们要理解下,什么叫同步非阻塞,默认场景下,Flow在没有切换线程的时候,运行在协程作用域指定的线程,这就是同步,那么非阻塞又是什么呢?我们知道emit和collect都是suspend函数,所谓suspend函数,就是会挂起,将CPU资源让出去,这就是非阻塞,因为suspend了就可以让一让,让给谁呢?让给其它需要执行的函数,执行完毕后,再把资源还给我。
所以,我们来看下面这个例子。
flow {
for (i in 0..3) {
emit(i)
}
}.onStart {
Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
Log.d("xys", "emit value---$it")
}.collect {
Log.d("xys", "Result---$it")
}
输出为:
D/xys: Start Flow in main
D/xys: emit value---0
D/xys: Result---0
D/xys: emit value---1
D/xys: Result---1
D/xys: emit value---2
D/xys: Result---2
D/xys: emit value---3
D/xys: Result---3
可以发现,emit一个,collect拿一个,这就是同步非阻塞,互相谦让,这样谁都可以执行,看上去flow中的代码和collect中的代码,就是同步执行的。
异步非阻塞模型
假如我们给Flow增加一个线程切换,让Flow执行在子线程,同样是上面的代码,我们再来看下执行情况。
flow {
for (i in 0..3) {
emit(i)
}
}.onStart {
Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
Log.d("xys", "emit value---$it")
}.flowOn(Dispatchers.IO).collect {
Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
Log.d("xys", "Result---$it")
}
输出为:
D/xys: Start Flow in DefaultDispatcher-worker-1
D/xys: emit value---0
D/xys: emit value---1
D/xys: emit value---2
D/xys: emit value---3
D/xys: Collect Flow in main
D/xys: Result---0
D/xys: Collect Flow in main
D/xys: Result---1
D/xys: Collect Flow in main
D/xys: Result---2
D/xys: Collect Flow in main
D/xys: Result---3
这个时候,Flow就变成了异步非阻塞模型,异步呢,就更好理解了,因为在不同线程,而此时的非阻塞,就没什么意义了,由于flow代码先执行,而这里的代码由于没有delay,所以是同步执行的,执行的同时,collect在主线程进行监听。
除了使用flowOn来切换线程,使用channelFlow也可以实现异步非阻塞模型。
向大家推荐下我的网站 https://xuyisheng.top/ 点击原文一键直达
专注 Android-Kotlin-Flutter 欢迎大家访问
往期推荐
更文不易,点个“三连”支持一下👇