抽丝剥茧Kotlin - 协程
共 18202字,需浏览 37分钟
·
2020-08-23 10:30
前言
文章接上篇,这一篇我们好好聊一聊协程的原理,通过上一篇的学习,相信大家对于如何使用协程已经非常熟悉了。
故事还得从上次的协程分享开始,由于大家对协程的实践并不多,所以大家对下面的这段代码如何执行争论不休:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a + b
Log.e(TAG,"result:$c")
}
有人说,a 和 b 会串行执行,有人说,a 和 b 会并行执行,那么执行的结果到底是什么样的?我们将在下面的文章给出。
本个系列文章分为三篇,本文是第二篇:
“《即学即用Kotlin - 协程》
《抽丝剥茧Kotlin - 协程基础篇》
《抽丝剥茧Kotlin - 协程Flow篇》
一、结构简要介绍
首先,我们得明确协程中有哪些东西,如果你会使用协程,那你肯定知道协程中有 CoroutineScope
、CoroutineContext
和 CoroutineDispatcher
,这些都是使用过程中我们可以接触到的 API。
我简单的整理了协程中主要的基础类:
协程的类结构可分为三部分:CoroutineScope
、CoroutineContext
和 Continuation
。
1. Continuation
如果你会使用协程,那你肯定知道,协程遇到耗时 suspend
操作可以挂起,等到任务结束的时候,协程会自动切回来。
它的奥秘就是 Continuation
,Continuation
可以理解程续体,你可以理解其每次在协程挂起点将剩余的代码包括起来,等到结束以后执行剩余的内容。一个协程的代码块可能会被切割成若干个 Continuation
,在每个需要挂起的地方都会分配一个 Continuation
。
先抛出一些结论,协程在做耗时操作的时候,如果执行了耗时 suspend
操作,会自动挂起,但是这个耗时操作终究是要做的,只不过切换到其他线程去做了,做完以后协程就需要切回来,但是切到哪儿呢?这便是 Continuation
需要解决的问题。
Continuation
的流程是这样的:
无论是使用 launch
还是 async
启动的协程,都会有一个结束的时候用来回调的 continuation
。
2. CoroutineScope
关于 CoroutineScope
没有特别多要说的,它持有了 CoroutineContext
,主要对协程的生命周期进行管理。
3. CoroutineContext
一开始看 CoroutineContext
觉得特别晕,不明白为啥要这么设计,看了 Bennyhuo 大佬的文章以后才稍微好转。
从上面协程的类的机构中可以看出,光看这个 CoroutineContext
这个接口(源码内容我们下面讲),会发现它有点像 List
集合,而继承自 CoroutineContext
接口的 Element
接口则定义了其中的元素。
随后,这个 Element
接口被划分成了两种类,Job
和 ContinuationInterceptor
:
Job
:从字面上来讲,它代表一个任务,Thread
也是执行任务,所以我们可以理解它定义了协程的一些东西,比如协程的状态,协程和子协程的管理方式等等。ContinuationInterceptor
:也从字面上来看,它是Continuation
的拦截器,通过拦截Continuation
,完成我们想要完成的工作,比如说线程的切换。
二、结构源码分析
上面我们从概念上介绍了协程的三大件,在这部分,我们从源码分析。
1. Continuation
suspend
修饰的方法会在在编译期间被编译器做特殊处理,这种处理被成为CPS(续体转换风格) 转化,suspend
方法会被包裹成 Continuation
。
说了这么久的 Continuation
,我们还没有见过接口代码,由于接口内容不多,我就把所有的内容贴出来了:
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result)
}
我们重点关注Continuation#resumeWith()
方法,从注释来看,通过返回 suspend
挂起点的值来恢复协程的执行,协程可以从参数 Result
获取成功的值或者失败的结果,如果没有结果,那么 Result
的泛型是 Unit
。Resulut
这个类也特别简单,感兴趣的同学可以查看源码。
BaseContinuationImpl
实现了 Continuation
接口,我们看一下 Continuation#resumeWith
方法的实现:
internal abstract class BaseContinuationImpl(
// 完成后调用的 Continuation
public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result =
try {
// 1. 执行 suspend 中的代码块
val outcome = invokeSuspend(param)
// 2. 如果代码挂起就提前返回
if (outcome === COROUTINE_SUSPENDED) return
// 3. 返回结果
Result.success(outcome)
} catch (exception: Throwable) {
// 3. 返回失败结果
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 4. 如果 completion 中还有子 completion,递归
current = completion
param = outcome
} else {
// 5. 结果通知
completion.resumeWith(outcome)
return
}
}
}
}
}
主要的过程我在注释中已经标注出来了,我来解释一下 Continuation
的机制。
每个 suspend
方法生成的 BaseContinuationImpl
,其构造方法有一个参数叫 completion
,它也是一个 Continuation
,它的调用时机是在 suspen
方法执行完毕的时候。我们后面称
这个流程展示给我们的内容很直观了,简单起见,我们直接看3、4和5这一个 launch
启动流程就好,通常一个 launch
生成一个外层 Continuation
一个相应的结果 Continuation
,我们后面称结果 continuation
为 complete
,Continuation
调用顺序是:
调用外层 Continuation
中的Continuation#resumeWith()
方法。该方法会去执行 launch
包裹的代码块,并返回一个结果。将上述代码块执行的结果交给 completion
,由它完成协程结束的通知。
上述的过程只存在于一个 launch
并且里面没有执行其他耗时的挂起操作,对于这些情况,我们将会在下面的文章讨论。
抛出问题一:可以看到,在注释2,遇到耗时的 suspend
,返回的结果是一个 COROUTINE_SUSPENDED
,后面会直接返回,耗时操作结束的时候,我们的 completion
怎么恢复呢?
2. CoroutineContext 和 Element
在概要分析的时候,我们说 CoroutineContext
的结构像一个集合,是从它的接口得出结论的:
public interface CoroutineContext {
// get 方法,通过 key 获取
public operator fun get(key: Key): E?
// 累加操作
public fun fold(initial: R, operation: (R, Element) -> R): R
// 操作符 + , 实际的实现调用了 fold 方法
public operator fun plus(context: CoroutineContext): CoroutineContext
// 移除操作
public fun minusKey(key: Key<*>): CoroutineContext
// CoroutineContext 定义的 Key
public interface Key
// CoroutineContext 中元素的定义
public interface Element : CoroutineContext {
// key
public val key: Key<*>
//...
}
}
从中我们可以大致看出,CoroutineContext
中可以通过 Key
来获取元素 Element
,并且 Element
接口也是继承自 CoroutineContext
接口。
除此以外,CoroutineContext
支持增加和移除操作,并且支持 +
操作符来完成增加。+
操作符即 plus
方法是有具体实现的,感兴趣的可以自己看一下,主要涉及到了拦截器 ContinuationInterceptor
的添加。
1.1 Job
Job
的注释中阐述定义是这样的:
“A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
从中我们可以得出:
后台任务 可取消 生命周期在完成它的时候结束
从后台任务的角度来看,Job
听着有点像 Thread
,和 Thread
一样,Job
也有各种状态,文档中对 Job
各种状态的注释(感觉大佬们的注释写的真棒~):
Job
另一个值得关注的点是对子 Job
的管理,主要的规则如下:
子 Job
都会结束的时候,父Job
才会结束父 Job
取消的时候,子Job
也会取消
上述的一些内容都可以从 Job
的接口文档中得出。那么,Job
哪里来的?如果你看一下CoroutineScope#launch
方法,你就会得出结论,该方法的返回类型就是 Job
,我们每次调用该方法,都会创建一个 Job
。
1.2 ContinuationInterceptor
顾名思义,Continuation
拦截器,先看接口:
interface ContinuationInterceptor : CoroutineContext.Element {
// ContinuationInterceptor 在 CoroutineContext 中的 Key
companion object Key : CoroutineContext.Key
/**
* 拦截 continuation
*/
fun interceptContinuation(continuation: Continuation<T>): Continuation
//...
}
这个接口可以提炼的就这两个信息:
拦截器的 Key
,也就是说,无论你后面一个CoroutineContext
放了多少个拦截器,Key
为ContinuationInterceptor
的拦截器只能有一个。我们都知道, Continuation
在调用其Continuation#resumeWith()
方法,会执行其suspend
修饰的函数的代码块,如果我们提前拦截到,是不是可以做点其他事情,比如说切换线程,这也是ContinuationInterceptor
的作用之一。
需要说明一下,我们通过 Dispatchers
来指定协程发生的线程,Dispatchers
实现了 ContinuationInterceptor
接口。
3. CoroutineScope
CoroutineScope
的接口很简单:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
它要求后续的实现都要提供 CoroutineContext
,不过我们都知道,CoroutineContext
是协程中很重要的东西,既包括 Job
,也包括调度器。
在上面的代码中,我多次使用了 Android Jetpack 中的 Lifecycle 中协程的扩展库,好处我们获取 CoroutineScope
更加简单,无需在组件 onDestroy
的时候手动 cancel
,并且它的源码超级简单,前提是你会使用 Lifecycle
:
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
// ...
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}
并且它也支持你在指定的生命周期调用协程,大家看一下接口就明白了。
三、过程源码分析
先上一段使用代码:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
虽然代码很简单,但是源码还是比较复杂的,我们分步讲。
第一步 获取 CoroutineScope
我已经在上面说明了,我们使用的 Lifecycle
的协程拓展库,如果我们不使用拓展库,就得使用 MainScope
,它们的 CoroutineContext
都是一样的:
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
// ...
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
// ...
return newScope
}
}
显而易见,MainScope
和 LifecycleCoroutineScope
都使用了 SupervisorJob() + Dispatchers.Main
, 作为它们的 CoroutineContext
。
说明一下,SupervisorJob
和Dispatchers.Main
很重要,它们分别代表了CoroutineContext
之前提及的 Job
和 ContinuationInterceptor
,后面用到的时候再分析。
第二步 启动协程
直接进入 CoroutineScope#launch()
方法:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
上面的方法一共有三个参数,前两个不作过多介绍,第三个参数:
block: suspend CoroutineScope.() -> Unit)
这是一个方法,是一个 lambda
参数,同时也表明了它需要被 suspend
修饰。继续看 launch
方法,发现它主要做了两件事:
组合新的 CoroutineContext
再创建一个 Continuation
组合新的CoroutineContext
在第一行代码 val newContext = newCoroutineContext(context)
做了第一件事,这里的 newCoroutineContext(context)
是一个扩展方法:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
CoroutineScope
使用本身的 coroutineContext
集合,利用 +
操作符将我们在 launch
方法中提供的 coroutineContext
添加进来。
再创建一个Continuation
回到上一段代码,通常我们不会指定 start
参数,所以它会使用默认的 CoroutineStart.DEFAULT
,最终 coroutine
会得到一个 StandaloneCoroutine
。
StandaloneCoroutine
实现自 AbstractCoroutine
,翻开上面的类图,你会发现,它实现了 Continuation
、Job
和 CoroutineScope
等一堆接口。需要说明一下,这个 StandaloneCoroutine
其实是我们当前 Suspend Contination
的 complete
。
接着会调用
coroutine.start(start, coroutine, block)
这就表明协程开始启动了。
第三步 start
进入到 AbstractCoroutine#start
方法:
public fun start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
跳过层层嵌套,最后到达了:
internal fun (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
虽然这仅仅是一个函数,但是后面主要的逻辑都揭露了:
创建一个没有拦截过的 Continuation
。拦截 Continuation
。执行 Continuation#resumeWith
方法。
第四步 又创建 Continuation
我这里用了 又,因为我们在 launch
中已经创建了一个 AbstractContinuaion
,不过它是一个 complete
,从各个函数的行参就可以看出来。
不过我们 suspend
修饰的外层 Continuation
还没有创建,它来了,是 SuspendLambda
,它继承自 ContinuationImpl
,如果你问我为什么源码中没找到具体实现,我觉得可能跟 suspend
修饰符有关,由编译器处理,但是调用栈确实是这样的:
看一下 SuspendLambda
类的实现:
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation?
) : ContinuationImpl(completion), FunctionBase, SuspendFunction {
constructor(arity: Int) : this(arity, null)
//...
}
可以看到,它的构造方法的形参就包括一个 complete
。
第五步 拦截处理
回到:
internal fun (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
里面的拦截方法 Continuation#intercepted()
方法是一个扩展方法:
@SinceKotlin("1.3")
public actual fun Continuation.intercepted(): Continuation =
(this as? ContinuationImpl)?.intercepted() ?: this
createCoroutineUnintercepted(receiver, completion)
返回的是一个 SuspendLambda
,所以它肯定是一个 ContinuationImpl
,看一下它的拦截方法的实现:
internal abstract class ContinuationImpl(
completion: Continuation?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
public fun intercepted(): Continuation =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// ...
}
在 ContinuationImpl#intercepted()
方法中,直接利用 context
这个数据结构通过 context[ContinuationInterceptor]
获取拦截器。
CoroutineDispatcher拦截实现
我们都知道 ContinuationInterceptor
具有拦截作用,它的直接实现是 CoroutineDispatcher
这个抽象类,所有其他调度器都直接或者间接继承这个类,我们关注一下它的拦截方法:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 1.拦截的 Continuation 被包了一层 DispatchedContinuation
public final override fun interceptContinuation(continuation: Continuation): Continuation =
DispatchedContinuation(this, continuation)
//...
}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation
) : DispatchedTask(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation by continuation {
// ...
override fun resumeWith(result: Result) {
// ...
if (dispatcher.isDispatchNeeded(context)) {
// 2. 后面一个参数需要提供 Runnable,父类已经实现
dispatcher.dispatch(context, this)
}
//...
}
// ...
}
// SchedulerTask 是一个 Runnable
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
// ...
public final override fun run() {
// ...
try {
//...
withCoroutineContext(context, delegate.countOrElement) {
// 3. continuation 是 DispatchedContinuation 包裹的 continuation
continuation.resume(...)
}
}
//...
}
}
简单来说,就是对原有的 Continuation
的 resumeWith
操作加了一层拦截,就像这样:
加入 CoroutineDispatcher
以后,执行真正的 Continue#resumeWith()
之前,会执行 CoroutineDispatcher#dispatch()
方法,所以我们现在关注 CoroutineDispatcher#dispatch
具体实现即可。
讲一个CoroutineDispatcher具体实现
首先我们得明确这个 CoroutineDispatcher
来自哪里?它从 context
获取,context
来自哪里?
注意 SuspendLambda
和 ContinuationImpl
的构造方法,SuspendLambda
中的参数没有 CoroutineContext
,所以只能来自 completion
中的 CoroutineContext
,而completion
的 CoroutineContext
来自 launch
方法中来自 CoroutineScope
,默认是 SupervisorJob() + Dispatchers.Main
,不过只有 Dispatchers.Main
继承了 CoroutineDispatcher
。
Dispatchers.Main
是一个 MainCoroutineDispatcher
,Android 中对应的 MainCoroutineDispatcher
是 HandlerContext
:
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//...
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 利用主线程的 Handler 执行任务
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) {
// 利用主线程的 Handler 延迟执行任务,将完成的 continuation 放在任务中执行
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//..
}
重点来了,调度任务最后竟然交给了主线程的 Handler
,其实想想也对,主线程的任务最后一般都会交给主线程的 Handler
。
好奇的同学可能问了,如果不是主线程呢?不是主线程就利用的线程池:
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
// 执行期
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
}
结果可以说是很清晰了,coroutineScheduler
是一个线程池,如果像了解具体的过程,同学们可以自行查看代码。
读到这里,你可能有一点明白 CoroutineContext
为什么要设计成一种数据结构:
coroutineContext[ContinuationInterceptor]
就可以直接取到当前协程的拦截器,并且一个协程只能对应一个调度器。调度器都放在其他 coroutineContext
的前面,所以在执行协程的时候,可以做拦截处理。
同理,我们也可以使用 coroutineContext[Job]
获取当前协程。
第六步 resumeWith
再次回到:
internal fun (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
现在我们看 Continue#resumeCancellableWith()
方法,它是一个扩展方法,里面的调度逻辑是:
DispatchContinuation#resumeCancellableWith
CoroutineDispatcher#dispatch
Continuation#resumeWith
这里的 Continuation
就是 SuspendLambda
,它继承了 BaseContinuationImpl
,我们看一下它的实现方法:
internal abstract class BaseContinuationImpl(
public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result =
try {
// 1. 执行 suspend 里面的代码块
val outcome = invokeSuspend(param)
// 2. 如果代码块里面执行了挂起方法,会提前返回
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 3. 如果完成的completion也是BaseContinuationImpl,就会进入循环
current = completion
param = outcome
} else {
// 4. 执行 completion resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}
这边被我分为2个部分:
执行 suspend
方法,并获取结果调用 complete
(放在下一步讲)
执行suspend方法
在第一处会先执行 suspend
修饰的方法内容,在方法里面可能又会调度 suspend
方法,比如说我们的实例方法:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
因为我们在 getResult
执行了延时操作,所以我们 launch
方法肯定执行了耗时挂起方法,所以 BaseContinuationImpl#invokeSuspend
方法会返回一个 COROUTINE_SUSPENDED
,结果你也看到了,该方法会提前结束。(说明一下,我没有找到BaseContinuationImpl#invokeSuspend
方法的具体实现,我猜可能跟编译器有关)
我猜你肯定跟我一样好奇,遇到耗时挂起会提前返回,那么耗时挂起如何对 complete
进行恢复的?
我们看一下 delay(1000)
这个延时操作在主线程是如何处理的:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
//...
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//...
}
可以看到,将恢复任务包了一个 Runnable
,交给 Handler
的 Handler#postDelayed()
方法了。
第七步 complete resumeWith
对于 complete
的处理一般会有两种。
complete是BaseContinuationImpl
第一种情况是我们称之为套娃,完成回调的 Continuation
它本身也有自己的完成回调 Continuation
,接下来循环就对了。
调用complete的resumeWith
第二种情况,就是通过 complete
去完成回调,由于 complete
是 AbstractContinuation
,我们看一下它的 resumeWith
:
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation, CoroutineScope {
// ...
public final override fun resumeWith(result: Result<T>) {
// 1. 获取当前协程的技术状态
val state = makeCompletingOnce(result.toState())
// 2. 如果当前还在等待完成,说明还有子协程没有结束
if (state === COMPLETING_WAITING_CHILDREN) return
// 3. 执行结束恢复的方法,默认为空
afterResume(state)
}
// 这是父类 JobSupport 中的 makeCompletingOnce 方法
// 为了方便查看,我复制过来
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
loopOnState { state ->
// tryMakeCompleting 的内容主要根据是否有子Job做不同处理
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
}
这段代码的意思其实也很简单,就是协程即将完成,得先评估一下协程的技术状态,别协程还有东西在运行,就给结束了。对于一些有子协程的一些协程,会等待子协程结束的时候,才会结束当前协程。
一个 launch
的过程大概就是这样了。大致的流程图是这样的:
下面我们再谈谈 async
。
四、关于async
async
和 launch
的代码相似度很高:
public fun CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
最终也会进行三步走:
internal fun (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
不同的是,async
返回的是一个 Deferred
,我们需要调用 Deferred#await()
去获取返回结果,它的实现在 JobSupport
:
private open class DeferredCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine(parentContext, active), Deferred, SelectClause1 {
// ... awaitInternal方法来自父类 JobSupport
override suspend fun await(): T = awaitInternal() as T
// ...
// 这是 JobSupport 中的实现
internal suspend fun awaitInternal(): Any? {
// 循环获取结果
while (true) { // lock-free loop on state
val state = this.state
// 1. 如果处于完成状态
if (state !is Incomplete) {
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
// 2. 除非需要重试,不然就 break
if (startInternal(state) >= 0) break
}
// 等待挂起的方法
return awaitSuspend() // slow-path
}
}
它的具体过程可以从我的注释看出,就不一一介绍了,感兴趣的同学可以查看源码。
1. 本文一开始的讨论
本文一开始的代码是错的,连编译器都过不了,尴尬~
正确的代码应该是:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a.await() + bawait()
Log.e(TAG,"result:$c")
}
如果是正确的代码,这里可能分两种情况:
如果你放在UI线程,那肯定是串行的,这时候有人说,我在 a
里使用 delay(1000)
,在 b
里使用 delay(2000)
,得到 c
的时候就花了 2000
毫秒啊,这不是并行吗?事情并不是这样的,delay
操作使用了 Handler#postDelay
方法,一个延迟了 1000
毫秒执行,一个延迟了 2000
毫秒执行,但是主线程只有一个,所以只能是串行。
如果是子线程,通常都是并行的,因为我们使用了线程池啊~
总结
写这边源码分析的时候,一些细节总是找不到,比如说 suspendLambda
的子类找不到,自己对 Kotlin 的学习有待深入。
所以本文有些地方还值得商榷,如果你有更好的理解,欢迎下方交流。
关于我
我是九心,新晋互联网码农,如果想要进阶和了解更多的干货,欢迎关注我的公众号接收到的我的最新文章。