【译】Kotlin 协程,JVM 线程以及并发问题

Flywith24

共 4021字,需浏览 9分钟

 · 2021-02-06

原文:Bridging the gap between coroutines, JVM threads, and concurrency problems
作者:Manuel Vivo
译者:Flywith24

「协程是轻量级的线程」,是不是经常听到这样的描述?这个描述对你理解协程有实质性的帮助吗?可能没有。阅读本文,您会对 「协程在 JVM 中实际的执行方式」,协程与线程的关系以及使用 JVM 线程模型时不可避免的 「并发问题」 有更多的了解。

协程与 JVM 线程

协程旨在简化执行异步操作的代码。基于 JVM 的协程的本质是:「传递给协程构建器的 lambda 代码块最终会在特定的 JVM 线程上执行」。如下面这个简单的 斐波那契数列(译者注:链接已改为百度百科)的计算:

// 在后台线程中计算第10个斐波那契数的协程
someScope.launch(Dispatchers.Default) {
    val fibonacci10 = synchronousFibonacci(10)
    saveFibonacciInMemory(10, fibonacci10)
}
private fun synchronousFibonacci(n: Long)Long { /* ... */ }

上面的 异步 协程代码块执行了同步且阻塞的斐波那契计算并将结果保存至内存。该代码块被 「协程库管理的线程池(通过 Dispatchers.Default 配置)分发调度」 并且在未来的某个时刻(取决于线程池的策略)在线程池中的线程执行。

请注意,因为没有挂起(suspend),所以上面的代码会在一个线程中执行。如果将执行的逻辑转移至不同的调度器(dispatcher),或者代码块可能在使用线程池的调度器中 yield / suspend,则协程可以在不同的线程中执行。

同样,如果没有协程,也可以使用线程手动执行上述逻辑,如下所示:

// 创建一个四个线程的线程池
val executorService = Executors.newFixedThreadPool(4)
// 在线程池中的线程上调度并执行下面代码
executorService.execute {
    val fibonacci10 = synchronousFibonacci(10)
    saveFibonacciInMemory(10, fibonacci10)
}

尽管手动管理线程池是可行的,但考虑到协程内置支持取消,更容易处理错误,使用可以降低内存泄露可能性的 结构化并发(structured concurrency) 以及 Jetpack 库的支持,「协程是 Android 中异步编程的推荐方案。」

背后的原理

开始创建协程到在线程中执行,这过程发生了什么?当使用标准的协程构建器创建协程时,您可以指定在特定的 CoroutineDispatcher 执行代码,默认将使用 Dispatchers.Default

「CoroutineDispatcher 负责将协程的执行分发给 JVM 线程」。原理是:当使用 CoroutineDispatcher 时,它会使用 interceptContinuation 拦截协程,该方法 「将 Continuation 包装在 DispatchedContinuation 中」。这是可行的,因为 CoroutineDispatcher 实现了 ContinuationInterceptor 接口。

如果您阅读过我的 协程工作原理 的文章,您已经知道编译器创建一个状态机,状态机的信息(如下一步需要执行的内容)保存在  Continuation 对象中。

如果需要在其它 Dispatcher 中执行 Continuation,DispatchedContinuation 的 resumeWith 方法负责分配给适合的协程!

此外,「DispatchedContinuation」DispatchedTask,在 JVM 中它是可在 JVM 线程上运行的 Runnable 对象!这很酷不是吗?当指定 CoroutineDispatcher 时,协程将转换为 DispatchedTask,该DispatchedTask 会作为一个 Runnable 在 JVM 线程上执行!

在创建协程时 dispatch 方法是如何调用的呢?使用标准的协程构建器创建协程,可以指定协程以 CoroutineStart 类型的 start 参数。例如,您可以使用 CoroutineStart.LAZY 将其配置为仅在需要时启动。默认情况下,使用 CoroutineStart.DEFAULT 来根据其 CoroutineDispatcher 调度协程执行。

协程中的代码块最终如何在线程中执行的图示

协程中的代码块最终如何在线程中执行的图示

调度器与线程池

您可以使用 Executor.asCoroutineDispatcher() 扩展函数将协程转换为 CoroutineDispatcher,从而在您的 app 线程池中执行协程。您也可以使用协程库中的默认 Dispatchers。

您可以在 createDefaultDispatcher 方法中看到如何初始化 Dispatchers.Default。默认情况下使用 DefaultScheduler。如果您查看 Dispatchers.IO 的实现,它还将使用 DefaultScheduler 并允许根据需要创建至少 64 个线程。Dispatchers.DefaultDispatchers.IO 隐式地连接在一起,因为它们使用相同的线程池。下面我们来看看使用不同的 Dispatcher 调用 withContext 的运行时开销是怎样的?

线程与 withContext 性能

在 JVM 中,如果创建的线程多于可用的 CPU 核心数,则在线程之间进行切换会带来一些运行时开销。上下文切换 的成本并不低!操作系统需要保存和恢复执行上下文,CPU 需要花时间调度线程而不是运行实际的 app 工作。除此之外,如果线程正在运行的代码阻塞了,也可能会发生上下文切换。如果线程是这种情况,将 withContext 与不同的 Dispatchers 配合使用是否会对性能造成损失?

幸运的是,如您所料,线程池为我们管理了这些复杂的场景,并尝试尽可能优化被执行的工作(这就是在线程池上执行工作比手动在线程中执行工作更好的原因)。协程也从中受益(因为它们是在线程池中调度的)!最重要的是,协程不阻塞线程,而是 suspend 工作!甚至更有效率!

默认情况下,CoroutineScheduler 是 JVM 实现中使用的线程池,「它以最有效的方式将分派的协程分配给工作线程」。由于 Dispatchers.DefaultDispatchers.IO 使用相同的线程池,因此优化了它们之间的切换,以尽可能避免线程切换。协程库可以优化这些调用,保留在相同的调度器(dispatcher)和线程上,并遵循一个快速路径(fast-path)。

由于 Dispatchers.Main 通常是 UI app 中不同的线程,因此在协程中 Dispatchers.DefaultDispatchers.Main 之间切换不会带来巨大的性能成本,因为协程只是挂起(即停止在一个线程中执行),并被调度到在另一个线程中执行。

协程中的并发问题

由于不同线程上的调度工作非常简单,协程 「确实」 使异步编程更容易。另一方面,这种简单性可能是一把双刃剑:「由于协程运行在 JVM 线程模型上,它们不能简单地摆脱线程模型带来的并发问题。」 因此,您必须注意避免并发问题。

多年来,不可变性(immutability)等良好实践已经缓解了您可能遇到的一些与线程有关的问题。然而,有些场景下不适合不可变性。所有并发问题的根源在于状态管理!特别是在多线程环境中访问 「可变状态」

多线程应用中的操作顺序是不可预测的。除了编译优化会带来有序性问题,上下文切换还可能带来原子性问题(译者注:并发问题可参考 译者的笔记)。如果在访问可变状态时未采取必要的预防措施,则线程可能会看到过时的数据,丢失更新或遭受 竞争状况 的困扰。

请注意,可变状态和访问顺序的问题不是 JVM 特有的,这些问题也会影响其它平台的协程。

使用协程的 app 本质上是一个多线程 app。「使用协程并且包含可变状态的类必须采取预防措施以确保执行结果符合预期」,即确保在协程中执行的代码能看到最新版本的数据。这样,不同的线程不会互相干扰。并发问题可能会导致非常小的错误,难以调试,甚至是 heisenbug!

这类问题并不罕见。例如可能一个类需要将已登录用户的信息保留在内存中,或者在应用运行时缓存某些值。如不小心,并发问题仍会在协程中发生!使用 withContext(defaultDispatcher) 的挂起函数不能总是在同一线程中执行!

假设我们有一个类可以缓存用户进行的交易。如果无法正确访问缓存,如下示例,则可能会发生并发错误:

class TransactionsRepository(
  private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {

  private val transactionsCache = mutableMapOf()

  private suspend fun addTransaction(user: User, transaction: Transaction) =
    // 小心!访问缓存是不受保护的。
    // 并发错误可能发生:线程可以看到过时的数据,竞争条件可能发生
    withContext(defaultDispatcher) {
      if (transactionsCache.contains(user)) {
        val oldList = transactionsCache[user]
        val newList = oldList!!.toMutableList()
        newList.add(transaction)
        transactionsCache.put(user, newList)
      } else {
        transactionsCache.put(user, listOf(transaction))
      }
    }
}

即使我们讨论的是 Kotlin,《Java 并发编程实践》(作者:Brian Goetz)一书也是了解更多这部分内容和 JVM 系统并发问题的绝佳资源。或者参考 Jetbrains 关于 共享可变状态和并发 的文档。

保护可变状态

如何保护可变状态或找到一个好的 同步 策略,完全取决于数据的性质和所涉及的操作。本节旨在使您意识到可能会遇到的并发问题,而不是列出保护可变状态的所有不同方法和 API。尽管如此,您还是可以从这里获得一些技巧和 API,以使得可变变量线程安全。

封装

可变状态应由一个 class 封装并拥有。该类集中对状态的访问,并根据场景使用更适合的同步策略来保护读写操作。

线程约束

有一种解决方案是限制对一个线程的读/写访问。可以使用队列以 生产者-消费者 的方式完成对可变状态的访问。JetBrains 对此有一个很好的文档。

不要重复造轮子

在 JVM 中,您可以使用线程安全的数据结构来保护可变变量。例如,对于简单计数器,可以使用 AtomicInteger。为了保护上面代码的 Map,可以使用 ConcurrentHashMap。ConcurrentHashMap 是一个线程安全的同步集合,可优化 Map 的读写吞吐量。

请注意,线程安全的数据结构不能防止调用方排序问题,它们只是确保内存访问是原子性的。当逻辑不太复杂时,它们有助于避免使用锁。例如,它们不能在上面显示的 transactionCache 示例中使用,因为操作顺序和它们之间的逻辑需要线程和访问保护。

同样,这些线程安全数据结构中的数据必须是不可变的或受保护的,以防止在修改已存储在其中的对象时出现竞争条件。

自定义解决方案

如果您有需要同步的复合操作,则 @Volatile 变量或线程安全的数据结构将无济于事!内置的 @Synchronized 注解可能不够精细,无法提高的效率。

在这种场景下,您可能需要使用并发工具(如 latch,信号量 或 屏障)创建自己的同步机制。其它场景,您可以使用锁或互斥锁保护代码的多线程访问。

Kotlin 中的 Mutex 具有 lock 和 unlock 的挂起函数以用来手动保护协程代码。Mutex.withLock 扩展函数使用很简单:

class TransactionsRepository(
  private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {

  // Mutex 保护缓存可变状态
  private val cacheMutex = Mutex()
  private val transactionsCache = mutableMapOf()

  private suspend fun addTransaction(user: User, transaction: Transaction) =
    withContext(defaultDispatcher) {
      // Mutex 使 读&写 缓存的操作 线程安全
      cacheMutex.withLock {
        if (transactionsCache.contains(user)) {
          val oldList = transactionsCache[user]
          val newList = oldList!!.toMutableList()
          newList.add(transaction)
          transactionsCache.put(user, newList)
        } else {
          transactionsCache.put(user, listOf(transaction))
        }
      }
    }
}

由于使用 Mutex 的协程在可以继续执行前会暂停执行,因此它比阻塞线程的 JVM 锁要有效得多。在协程中使用 JVM 同步类时要小心,因为这可能会阻塞在其中执行协程的线程并产生 liveness 问题。


传递给协程构建器的代码块最终在一个或多个 JVM 线程上执行。因此,协程运行在 JVM 线程模型中并受其所有约束。使用协程,仍会写出错误的多线程代码。因此,在代码中访问共享的可变状态要小心!

译文完。

译者总结

  • 基于 JVM 的 Kotlin 协程本质上是基于 JVM 线程池工作的
  • 协程是 Android 中异步编程的推荐方案
  • 协程也存在并发问题,开发者需要注意并解决
  • 并发问题的根源在于状态管理
  • 保护可变状态需要视具体情况而定,但有一些小技巧

推荐阅读

  • 并发问题出现的源头

  • 线程池

  • 码上开学:Kotlin 的协程用力瞥一眼 - 学不会协程?很可能因为你看过的教程都是错的

  • 码上开学:Kotlin 协程的挂起好神奇好难懂?今天我把它的皮给扒了

  • 码上开学:到底什么是「非阻塞式」挂起?协程真的更轻量级吗

关于我

人总是喜欢做能够获得正反馈(成就感)的事情,如果感觉本文内容对你有帮助的话,麻烦点亮一下👍,这对我很重要哦~

我是 Flywith24,「人只有通过和别人的讨论,才能知道我们自己的经验是否是真实的」,加我微信交流,让我们共同进步。

微信:Flywith24


浏览 14
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报