Skip to content

Latest commit

 

History

History
1050 lines (786 loc) · 48.7 KB

8.Kotlin_协程.md

File metadata and controls

1050 lines (786 loc) · 48.7 KB

8.Kotlin_协程

协程并不是Kotlin提出来的概念,其他的一些编程语言,例如Go、Python等都可以在语言层面上实现协程,甚至是Java,也可以通过使用扩展库来间接的支持协程。

Kotlin引入了协程(Coroutine)来支持更好的异步操作,利用它我们可以避免在异步编程中使用大量的回调,同时相比传统多线程技术,它更容易提升系统的高并发处理能力。

协程(coroutine),又称微线程,是一种无优先级的子程序调度组件,由协程构建器(coroutine builder)启动。

协程本质上是一种用户态的轻量级线程

起源

线程是由操作系统来进行调度的,当操作系统切换线程的时候,会产生一定的消耗。而协程不一样,协程是包含于线程的,也就是说协程 是工作在线程之上的,协程的切换可以由程序自己来控制,不需要操作系统进行调度。这样的话就大大降低了开销。

协程就像一个轻量级的线程在幕后,启动协程就像启动一个单独的执行线程。线程在其他语言中很常见,例如Java,协程和线程可以并行运行,并互相通信。然而,不同点在于使用协程比使用线程更加高效。在性能方面,启动一个线程并使其保持运行是非常昂贵的。

处理器通常只能同时运行有限数量的线程,并且运行尽可能少的线程会更高效。

而另一方面,协程默认运行在共享的线程池中,同一个线程可以运行多个协程。由于使用的线程较少,当你想要运行异步任务时,使用协程会更加高效。

也就是说本质上Kotlin协程就是创建了一个可以复用的线程池,并且协程的delay是一个特殊的挂起函数,它不会造成线程堵塞,但是会挂起协程,并且只能在协程中使用。

协程是语言层面的东西,线程是系统层面的东西。
协程就是一段代码块,既然是代码那就离不开CPU的执行,而CPU调度的基本单位是线程。

可以说,只要内存足够,一个线程中可以包含任意多个协程,但某一时刻却只能有一个协程在运行,多个协程分享该线程资源。

进程、线程、协程

  • 进程:一段程序的执行过程,资源分配和调度的基本单位,有其独立地址空间,互相之间不发生干扰
  • 线程:轻量级进程,资源调度的最小单位,共享父进程地址空间和资源,其调度和进程一样要切换到内核态
  • 并行:同时发生,在多核CPU中,多个任务可同时在不同CPU上面同一时间执行
  • 并发:宏观并行,微观串行,操作系统根据相关算法,分配时间片来调度,从而达到一种宏观上并行的方式
  • 上下文:程序执行的状态,通常用调用栈记录程序执行的当前状态以及其相关的环境信息

早期,CPU是单核,无法真正并行,为了产生共享CPU的假象,提出了时间片概念,将时间分割成连续的时间片段,多个程序交替获得CPU使用权限。 而管理时间片分配调度的调度器则成为操作系统的核心组件。

程序能交替执行,但上下文切换必然会引起程序相关变量混乱,因此在物理地址基础上提出虚拟地址概念 CPU增加内存管理单元,进行虚拟地址和物理地址的转换 操作系统加入内存管理模块,管理物理内存和虚拟内存

进程是一个实体,包括程序代码以及其相关资源(内存,I/O,文件等),可被操作系统调度。但想一边操作I/O进行输入输出,一边想进行加减计算,就得两个进程,这样写代码,内存就爆表了。于是又想着能否有一轻量级进程呢,只执行程序,不需要独立的内存,I/O等资源,而是共享已有资源,于是产生了线程。

一个进程可以跑很多个线程处理并发,但是线程进行切换的时候,操作系统会产生中断,线程会切换到相应的内核态,并进行上下文的保存,这个过程不受上层控制,是操作系统进行管理。
然而内核态线程会产生性能消耗,因此线程过多,并不一定提升程序执行的效率。

正是由于:

  1. 线程的调度不能精确控制;
  2. 线程的切换会产生性能消耗。

协程出现了。

协程:

  1. 协程是一种轻量级的用户态线程
  2. 开发者自行控制程序切换时机,而不是像进程和线程那样把控制权交给操作系统
  3. 协程没有线程、进程切换的时间和资源开销
  4. 协程是非抢占式调度,当前协程切换到其他协程是由自己控制;线程则是时间片用完抢占时间片调度

优点:

  1. 用户态,语言级别
  2. 无切换性能消耗
  3. 非抢占式
  4. 同步代码思维
  5. 减少同步锁

缺点:

  1. 注意全局变量
  2. 阻塞操作会导致整个线程被阻塞

简单地讲,Kotlin 的协程就是一个封装在线程上面的线程框架。

它有两个非常关键的亮点:

  • 耗时函数自动后台,从而提高性能;
  • 线程的「自动切回」

所以,Kotlin 的协程在 Android 开发上的核心好处就是:消除回调地域。

线程操作需要VM或OS的支持,通过CPU调度来执行生效。
而协程主要通过编译技术来实现,通过参入代码来生效,不需要VM或OS的额外支持。

简单来说,协程是编译器级的,而线程是操作系统级的。

协程运行在同一个线程上,没有上下文切换。 和线程不同,协程有多个入口点,可以在指定的位置挂起和恢复执行,而线程通常只有一个入口点,且只能返回一次。
其次,协程不需要多线程的锁机制,因为协程只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不需要加锁,只需要判断状态即可,因此执行效率比多线程高很多。
另外,使用协程后,不再需要像异步编程时那样写很多回调函数,代码结构不再支离破碎,整个代码逻辑看上去和同步代码没有什么区别,代码结构更加优雅简洁。

使用

如需在 Android 项目中使用协程,请将以下依赖项添加到应用的build.gradle文件中:

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9'
}

示例

在代码中,我们使用GlobalScope.launch在后台运行一个新的协程。在幕后,它创建了一个新的线程使协程在其中运行,

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
        println("World!") // 在延迟后打印输出
    }
    println("Hello,") // 协程已在等待时主线程还在继续
    Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活
}
// 运行结果
Hello,
World!

本质上,协程是轻量级的线程。它们在某些CoroutineScope上下文中与launch协程构建器一起启动。 这里我们在ClobalScope中启动了一个新的协程,这意味着新协程的生命周期只受整个应用程序的生命周期限制。

使用runBlocking在相同作用域内运行协程如果希望代码在相同线程、不同协程中运行,你可以使用runBlocking函数。runBlocking是一个高阶函数,它会阻塞当前线程,直到传入其中的代码运行完成。runBlocking函数定义了一个作用域,传入其中的代码继承该作用域。在本例中,我们可以使用该作用域在同一线程中运行不同的协程。

delay函数暂停当前协程在这种情况下,更好的解决方案是使用协程的delay函数取而代之。该函数与Thread.sleep有相似的效果,除了它会暂停当前协程而不是当前线程。它将协程挂起指定的时长,这允许运行该线程中的其他代码。delay函数可用于以下两种情况:

  • 在协程中使用。
  • 在某个编译器知道将会暂停或挂起的函数中使用,在函数中调用可挂起的函数时(例如delay),该函数必须被标记为suspend。

例如,以下代码将协程延迟1秒:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主线程中的代码会立即执行
    runBlocking {     // 但是这个表达式阻塞了主线程
        delay(2000L)  // ……我们延迟 2 秒来保证 JVM 的存活
    } 
}

调用了runBlocking的主线程会一直阻塞直到runBlocking内部的协程执行完毕。

这个示例可以使用更合乎惯用法的方式重写,使用runBlocking来包装main函数的执行:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> { // 开始执行主协程
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主协程在这里会立即执行
    delay(2000L)      // 延迟 2 秒来保证 JVM 存活
}

这里的runBlocking { …… }作为用来启动顶层主协程的适配器。 我们显式指定了其返回类型Unit, 因为在Kotlin中main函数必须返回Unit类型。runBlocking为最高级的协程,也就是主协程,launch创建的协程能够在 runBlocking中运行(反过来是不行的)。所以上面的代码可以看做是在一个线程中创建了一个主协程,然后在主协程中创建了一个输出为“World!”的子协程。

等待一个作业

上面我们使用delay(2000L)来让主线程延迟2秒,保证JVM的存活,但是这样并不是一个好的实现方案,因为在很多情况下我们并不知道耗时的任务要执行多久。

val job = GlobalScope.launch { // 启动一个新协程并保持对这个作业的引用
    delay(1000L)
    println("World!")
}
println("Hello,")
job.join() // 等待直到子协程执行结束

加了job.join()后,程序就会一直等待,直到我们启动的协程结束。注意,这里的等待是非堵塞式的等待,不会将当前线程挂起。

结构化的并发

协程的实际使用还有一些需要改进的地方。 当我们使用 GlobalScope.launch 时,我们会创建一个顶层协程。虽然它很轻量, 但它运行时仍会消耗一些内存资源。如果我们忘记保持对新启动的协程的引用,它还会继续运行。如果协程中的代码挂起了会怎么样 (例如,我们错误地延迟了太长时间),如果我们启动了太多的协程并导致内存不足会怎么样? 必须手动保持对所有已启动协程的引用并 join 之很容易出错。

有一个更好的解决办法。我们可以在代码中使用结构化并发。 我们可以在执行操作所在的指定作用域内启动协程, 而不是像通常使用线程 (线程总是全局的)那样在 GlobalScope 中启动。

在我们的示例中,我们使用 runBlocking 协程构建器将 main 函数转换为协程。 包括 runBlocking 在内的每个协程构建器都将 CoroutineScope 的实例添加到其代码块所在的作用域中。 我们可以在这个作用域中启动协程而无需显式 join 之,因为外部协程(示例中的 runBlocking) 直到在其作用域中启动的所有协程都执行完毕后才会结束。因此,可以将我们的示例简化为:

import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope
    launch { // 在 runBlocking 作用域中启动一个新协程
        delay(1000L)
        println("World!")
    }
    println("Hello,")
}

作用域构建器

除了由不同的构建器提供协程作用域之外,还可以使用 coroutineScope 构建器声明自己的作用域。它会创建一个协程作用域并且在所有已启动子协程执行完毕之前不会结束。

runBlockingcoroutineScope 可能看起来很类似,因为它们都会等待其协程体以及所有子协程结束。 主要区别在于,runBlocking 方法会阻塞当前线程来等待, 而 coroutineScope 只是挂起,会释放底层线程用于其他用途。 由于存在这点差异, runBlocking 是常规函数, 而 coroutineScope 是挂起函数。

可以通过以下示例来演示:

import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope
    launch { 
        delay(200L)
        println("Task from runBlocking")
    }
    
    coroutineScope { // 创建一个协程作用域
        launch {
            delay(500L) 
            println("Task from nested launch")
        }
    
        delay(100L)
        println("Task from coroutine scope") // 这一行会在内嵌 launch 之前输出
    }
    
    println("Coroutine scope is over") // 这一行在内嵌 launch 执行完毕后才输出
}
// 执行结果
Task from coroutine scope
Task from runBlocking
Task from nested launch
Coroutine scope is over

提取函数重构

我们来将 launch { …… } 内部的代码块提取到独立的函数中。当你对这段代码执行“提取函数”重构时,你会得到一个带有 suspend 修饰符的新函数。 这是你的第一个挂起函数。在协程内部可以像普通函数一样使用挂起函数, 不过其额外特性是,同样可以使用其他挂起函数(如本例中的 delay)来挂起协程的执行。

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch { doWorld() }
    println("Hello,")
}

// 这是你的第一个挂起函数
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

// 执行结果
Hello,
World!

取消协程的执行

在一个长时间运行的应用程序中,你也许需要对你的后台协程进行细粒度的控制。 比如说,一个用户也许关闭了一个启动了协程的界面, 那么现在协程的执行结果已经不再被需要了,这时,它应该是可以被取消的。 该 launch 函数返回了一个可以被用来取消运行中的协程的 Job

val job = launch {
    repeat(1000) { i ->
        println("job: I'm sleeping $i ...")
        delay(500L)
    }
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消该作业
job.join() // 等待作业执行结束
println("main: Now I can quit.")
// 执行结果
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

一旦main函数调用了 job.cancel,我们在其它的协程中就看不到任何输出,因为它被取消了。 这里也有一个可以使 Job 挂起的函数 cancelAndJoin 它合并了对 cancel 以及 join 的调用。

取消是协作的

协程的取消是协作的。一段协程代码必须协作才能被取消。 所有 kotlinx.coroutines中的挂起函数都是 可被取消的 。 它们检查协程的取消,并在取消时抛出 CancellationException。 然而,如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的,就如如下示例代码所示:

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
        // 每秒打印消息两次
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // 等待一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消一个作业并且等待它结束
println("main: Now I can quit.")
// 执行结果
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.

运行示例代码,并且我们可以看到它连续打印出了“I'm sleeping”,甚至在调用取消后, 作业仍然执行了五次循环迭代并运行到了它结束为止。

使计算代码可取消

我们有两种方法来使执行计算的代码可以被取消。第一种方法是定期调用挂起函数来检查取消。 对于这种目的 yield 是一个好的选择。 另一种方法是显式的检查取消状态。让我们试试第二种方法。

将前一个示例中的 while (i < 5) 替换为 while (isActive) 并重新运行它。

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // 可以被取消的计算循环
        // 每秒打印消息两次
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // 等待一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并等待它结束
println("main: Now I can quit.")
// 执行结果
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

你可以看到,现在循环被取消了。isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。

finally 中释放资源

我们通常使用如下的方法处理在被取消时抛出CancellationException 的可被取消的挂起函数。比如说,try {……} finally {……}表达式以及Kotlin的use 函数一般在协程被取消的时候执行它们的终结动作:

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并且等待它结束
println("main: Now I can quit.")
// 执行结果
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

阻塞 vs 挂起

基本上,协程计算可以被挂起而无需阻塞线程。线程阻塞的代价通常是昂贵的,尤其在高负载时,因为只有相对少量线程实际可用,因此阻塞其中一个会导致一些 重要的任务被延迟。

另一方面,协程挂起几乎是无代价的。不需要上下文切换或者OS的任何其他干预。最重要的是,挂起可以在很大程度上由用户库控制:
作为库的作者,我们可以决定挂起时发生什么并根据需求优化/记日志/截获。

另一个区别是,协程不能在随机的指令中挂起,而只能在所谓的挂起点挂起,这会调用特别标记的函数。

对于回调式的写法,如果并发场景再复杂一些,代码的嵌套可能会更多,这样的话维护起来就非常麻烦。但如果你使用了 Kotlin 协程,多层网络请求只需要这么写:

coroutineScope.launch(Dispatchers.Main) {       // 开始协程:主线程
    val token = api.getToken()                  // 网络请求:IO 线程
    val user = api.getUser(token)               // 网络请求:IO 线程
    nameTv.text = user.name                     // 更新 UI:主线程
}

协程最简单的使用方法,其实在前面章节就已经看到了。我们可以通过一个 launch 函数实现线程切换的功能:

coroutineScope.launch(Dispatchers.IO) {
    ...
}
复制代码

这个 launch 函数,它具体的含义是:我要创建一个新的协程,并在指定的线程上运行它。这个被创建、被运行的所谓「协程」是谁?就是你传给 launch 的那些代码,这一段连续代码叫做一个「协程」。

所以,什么时候用协程?当你需要切线程或者指定线程的时候。你要在后台执行任务?切!

launch(Dispatchers.IO) {
    val image = getImage(imageId)
}

然后需要在前台更新界面?再切!

coroutineScope.launch(Dispatchers.IO) {
    val image = getImage(imageId)
    launch(Dispatch.Main) {
        avatarIv.setImageBitmap(image)
    }
}

好像有点不对劲?这不还是有嵌套嘛。

如果只是使用 launch 函数,协程并不能比线程做更多的事。不过协程中却有一个很实用的函数:withContext 。这个函数可以切换到指定的线程,并在闭包内的逻辑执行结束之后,自动把线程切回去继续执行。那么可以将上面的代码写成这样:

coroutineScope.launch(Dispatchers.Main) {      // 👈 在 UI 线程开始
    val image = withContext(Dispatchers.IO) {  // 👈 切换到 IO 线程,并在执行完成后切回 UI 线程
        getImage(imageId)                      // 👈 将会运行在 IO 线程
    }
    avatarIv.setImageBitmap(image)             // 👈 回到 UI 线程更新 UI
} 

这种写法看上去好像和刚才那种区别不大,但如果你需要频繁地进行线程切换,这种写法的优势就会体现出来。可以参考下面的对比:

// 第一种写法
coroutineScope.launch(Dispachers.IO) {
    ...
    launch(Dispachers.Main){
        ...
        launch(Dispachers.IO) {
            ...
            launch(Dispacher.Main) {
                ...
            }
        }
    }
}

// 通过第二种写法来实现相同的逻辑
coroutineScope.launch(Dispachers.Main) {
    ...
    withContext(Dispachers.IO) {
        ...
    }
    ...
    withContext(Dispachers.IO) {
        ...
    }
    ...
}

由于可以"自动切回来",消除了并发代码在协作时的嵌套。由于消除了嵌套关系,我们甚至可以把 withContext 放进一个单独的函数里面:

launch(Dispachers.Main) {              // 👈 在 UI 线程开始
    val image = getImage(imageId)
    avatarIv.setImageBitmap(image)     // 👈 执行结束后,自动切换回 UI 线程
}
//                               👇
fun getImage(imageId: Int) = withContext(Dispatchers.IO) {
    ...
}

这就是之前说的「用同步的方式写异步的代码」了。

不过如果只是这样写,编译器是会报错的:

fun getImage(imageId: Int) = withContext(Dispatchers.IO) {
    // IDE 报错 Suspend function'withContext' should be called only from a coroutine or another suspend funcion
}

意思是说,withContext 是一个 suspend 函数,它需要在协程或者是另一个 suspend 函数中调用。

挂起函数

当我们调用标记有特殊修饰符suspend的函数时,会发生挂起:

suspend fun doSomething(foo: Foo): Bar {
    ……
}

这样的函数称为挂起函数,因为调用它们可能挂起协程(如果相关调用的结果已经可用,库可以决定继续进行而不挂起)。

挂起函数能够以与普通函数相同的方式获取参数和返回值,但它们只能从协程和其他挂起函数中调用。

事实上,要启动协程,必须至少有一个挂起函数,它通常是匿名的(即它是一个挂起lambda表达式)。

让我们来看一个例子,一个简化的async()函数 (源自kotlinx.coroutines库):

fun <T> async(block: suspend () -> T)

这里的async()是一个普通函数(不是挂起函数),但是它的block参数具有一个带suspend修饰符的函数类型:suspend() -> T。 所以,当我们将一个lambda表达式传给async()时,它会是挂起lambda表达式,于是我们可以从中调用挂起函数:

async {
    doSomething(foo)
    ……
}

await()可以是一个挂起函数(因此也可以在一个async {}块中调用),该函数挂起一个协程,直到一些计算完成并返回其结果:

async {
    ……
    val result = computation.await()
    ……
}

请注意,挂起函数await()doSomething()不能在像main()这样的普通函数中调用:

fun main(args: Array<String>) {
    doSomething() // 错误:挂起函数从非协程上下文调用
}

还要注意的是,挂起函数可以是虚拟的,当覆盖它们时,必须指定suspend修饰符:

interface Base {
    suspend fun foo()
}

class Derived: Base {
    override suspend fun foo() { …… }
}

默认顺序调用

假设我们在不同的地方定义了两个进行某种调用远程服务或者进行计算的挂起函数。我们只假设它们都是有用的,但是实际上它们在这个示例中只是为了该目的而延迟了一秒钟:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了一些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了一些有用的事
    return 29
}

如果需要按 顺序 调用它们,我们接下来会做什么——首先调用 doSomethingUsefulOne 接下来 调用 doSomethingUsefulTwo,并且计算它们结果的和吗? 实际上,如果我们要根据第一个函数的结果来决定是否我们需要调用第二个函数或者决定如何调用它时,我们就会使用普通的顺序来进行调用,因为这些代码是运行在协程中的,只要像常规的代码一样 顺序 都是默认的。 下面的示例展示了测量执行两个挂起函数所需要的总时间:

val time = measureTimeMillis {
    val one = doSomethingUsefulOne()
    val two = doSomethingUsefulTwo()
    println("The answer is ${one + two}")
}
println("Completed in $time ms")

它的打印输出如下:

The answer is 42
Completed in 2017 ms

使用async并发

如果doSomethingUsefulOnedoSomethingUsefulTwo之间没有依赖,并且我们想更快的得到结果,让它们进行并发吗? 这就是async可以帮助我们的地方。

在概念上,async 就类似于launch。 它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发的工作。不同之处在于launch返回一个 Job 并且不附带任何结果值, 而async返回一个Deferred —— 一个轻量级的非阻塞可取消的future,这代表了一个将会在稍后提供结果的promise。你可以使用.await() 在一个延期的值上得到它的最终结果, 但是Deferred也是一个Job,所以如果需要的话,你可以取消它。

val time = measureTimeMillis {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

它的打印输出如下:

The answer is 42
Completed in 1017 ms

这里快了两倍,因为两个协程并发执行。请注意,使用协程进行并发总是显式的。

惰性启动的 async

可选的,async 可以通过将 start 参数设置为 CoroutineStart.LAZY 而变为惰性的。 在这个模式下,只有结果通过 await 获取的时候协程才会启动, 或者在 Jobstart 函数调用的时候。运行下面的示例:

val time = measureTimeMillis {
    val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
    val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
    // 执行一些计算
    one.start() // 启动第一个
    two.start() // 启动第二个
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

它的打印输出如下:

The answer is 42
Completed in 1017 ms

因此,在前面的例子中这里定义的两个协程没有执行,但是控制权在于程序员准确的在开始执行时调用 start。 我们首先调用 one,然后调用 two,接下来等待这个协程执行完毕。

注意,如果我们只是在 println 中调用 await, 而没有在单独的协程中调用 start,这将会导致顺序行为, 直到 await 启动该协程 执行并等待至它结束, 这并不是惰性的预期用例。 在计算一个值涉及挂起函数时,这个 async(start = CoroutineStart.LAZY) 的用例用于替代标准库中的 lazy 函数。

协程上下文与调度器

协程总是运行在一些以 CoroutineContext 类型为代表的上下文中,它们被定义在了 Kotlin 的标准库里。

协程上下文是各种不同元素的集合。其中主元素是协程中的 Job, 我们在前面见过它以及它的调度器。

调度器与线程

协程上下文包含一个 协程调度器 (参见 CoroutineDispatcher) 它确定了相关的协程在哪个线程或哪些线程上执行。协程调度器可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。

所有的协程构建器诸如 launchasync 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。

尝试下面的示例:

launch { // 运行在父协程的上下文中,即 runBlocking 主协程
    println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
    println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 将会获取默认调度器
    println("Default               : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
    println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

当调用 launch { …… } 时不传参数,它从启动了它的 CoroutineScope 中承袭了上下文(以及调度器)。在这个案例中,它从 main 线程中的 runBlocking 主协程承袭了上下文。

Dispatchers.Unconfined 是一个特殊的调度器且似乎也运行在 main 线程中,但实际上, 它是一种不同的机制,这会在后文中讲到。

当协程在 GlobalScope 中启动时, 使用的是由 Dispatchers.Default 代表的默认调度器。 默认调度器使用共享的后台线程池。 所以 launch(Dispatchers.Default) { …… }GlobalScope.launch { …… } 使用相同的调度器。

newSingleThreadContext 为协程的运行启动了一个线程。 一个专用的线程是一种非常昂贵的资源。 在真实的应用程序中两者都必须被释放,当不再需要的时候, 使用 close 函数, 或存储在一个顶层变量中使它在整个应用程序中被重用。

非受限调度器 vs 受限调度器

Dispatchers.Unconfined 协程调度器在调用它的线程启动了一个协程,但它仅仅只是运行到第一个挂起点。挂起后,它恢复线程中的协程,而这完全由被调用的挂起函数来决定。 非受限的调度器非常适用于执行不消耗 CPU 时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。

另一方面,该调度器默认继承了外部的 CoroutineScoperunBlocking 协程的默认调度器, 特别是, 当它被限制在了调用者线程时,继承自它将会有效地限制协程在该线程运行并且具有可预测的 FIFO 调度。

launch(Dispatchers.Unconfined) { // 非受限的——将和主线程一起工作
    println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
    delay(500)
    println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
}
launch { // 父协程的上下文,主 runBlocking 协程
    println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
    delay(1000)
    println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}

执行后的输出:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

所以,该协程的上下文继承自 runBlocking {...} 协程并在 main 线程中运行, 当 delay 函数调用的时候, 非受限的那个协程在默认的执行者线程中恢复执行。

非受限的调度器是一种高级机制,可以在某些极端情况下提供帮助而不需要调度协程以便稍后执行或产生不希望的副作用, 因为某些操作必须立即在协程中执行。 非受限调度器不应该在通常的代码中使用。

命名协程以用于调试

当协程经常打印日志并且你只需要关联来自同一个协程的日志记录时, 则自动分配的 id 是非常好的。然而,当一个协程与特定请求的处理相关联时或做一些特定的后台任务, 最好将其明确命名以用于调试目的。 CoroutineName 上下文元素与线程名具有相同的目的。当调试模式开启时, 它被包含在正在执行此协程的线程名中。

下面的例子演示了这一概念:

log("Started main coroutine")
// 运行两个后台值计算
val v1 = async(CoroutineName("v1coroutine")) {
    delay(500)
    log("Computing v1")
    252
}
val v2 = async(CoroutineName("v2coroutine")) {
    delay(1000)
    log("Computing v2")
    6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")

程序执行使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

组合上下文中的元素

有时我们需要在协程上下文中定义多个元素。我们可以使用 + 操作符来实现。 比如说,我们可以显式指定一个调度器来启动协程 并且同时显式指定一个命名:

launch(Dispatchers.Default + CoroutineName("test")) {
    println("I'm working in thread ${Thread.currentThread().name}")
}

这段代码使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I'm working in thread DefaultDispatcher-worker-1 @test#2

协程作用域

让我们将关于上下文,子协程以及作业的知识综合在一起。假设我们的应用程序拥有一个具有生命周期的对象,但这个对象并不是一个协程。 举例来说,我们编写了一个 Android 应用程序并在 Android 的 activity 上下文中启动了一组协程来使用异步操作拉取并更新数据以及执行动画等等。 所有这些协程必须在这个 activity 销毁的时候取消以避免内存泄漏。当然,我们也可以手动操作上下文与作业,以结合 activity 的生命周期与它的协程, 但是 kotlinx.coroutines 提供了一个封装:CoroutineScope 的抽象。 你应该已经熟悉了协程作用域,因为所有的协程构建器都声明为在它之上的扩展。

我们通过创建一个 CoroutineScope 实例来管理协程的生命周期, 并使它与 activity 的生命周期相关联。CoroutineScope 可以通过 CoroutineScope() 创建或者通过MainScope() 工厂函数。 前者创建了一个通用作用域,而后者为使用 Dispatchers.Main 作为默认调度器的 UI 应用程序 创建作用域:

class Activity {
    private val mainScope = MainScope()

    fun destroy() {
        mainScope.cancel()
    }
    // 继续运行……

现在,我们可以使用定义的 scope 在这个 Activity 的作用域内启动协程。 对于该示例,我们启动了十个协程,它们会延迟不同的时间:

// 在 Activity 类中
    fun doSomething() {
        // 在示例中启动了 10 个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

在 main 函数中我们创建 activity,调用测试函数 doSomething,并且在 500 毫秒后销毁这个 activity。 这取消了从 doSomething 启动的所有协程。我们可以观察到这些是由于在销毁之后, 即使我们再等一会儿,activity 也不再打印消息。

val activity = Activity()
activity.doSomething() // 运行测试函数
println("Launched coroutines")
delay(500L) // 延迟半秒钟
println("Destroying activity!")
activity.destroy() // 取消所有的协程
delay(1000) // 为了在视觉上确认它们没有工作

这个示例的输出如下所示:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

你可以看到,只有前两个协程打印了消息,而另一个协程在 Activity.destroy() 中单次调用了 job.cancel()

通道

延期的值提供了一种便捷的方法使单个值在多个协程之间进行相互传输。 通道提供了一种在流中传输值的方法。

通道基础

一个 Channel 是一个和 BlockingQueue 非常相似的概念。其中一个不同是它代替了阻塞的 put 操作并提供了挂起的 send, 还替代了阻塞的 take 操作并提供了挂起的 receive

val channel = Channel<Int>()
launch {
    // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
    for (x in 1..5) channel.send(x * x)
}
// 这里我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")

这段代码的输出如下:

1
4
9
16
25
Done!

关闭与迭代通道

和队列不同,一个通道可以通过被关闭来表明没有更多的元素将会进入通道。 在接收者中可以定期的使用 for 循环来从通道中接收元素。

从概念上来说,一个 close 操作就像向通道发送了一个特殊的关闭指令。 这个迭代停止就说明关闭指令已经被接收了。所以这里保证所有先前发送出去的元素都在通道关闭前被接收到。

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // 我们结束发送
}
// 这里我们使用 `for` 循环来打印所有被接收到的元素(直到通道被关闭)
for (y in channel) println(y)
println("Done!")

Architecture Component Coroutine

如果你使用了Architecture Component,那么你也可以在其基础上使用Coroutine,因为Kotlin Coroutine已经提供了相应的api并且定制了CoroutineScope。 在ViewModel中,为了能够使用Coroutine提供了viewModelScope.launch,同时一旦ViewModel被清除,对应的Coroutine也会自动取消。

 fun getAll() {
        viewModelScope.launch {
            val articleList = withContext(Dispatchers.IO) {
                articleDao.getAll()
            }
            adapter.clear()
            adapter.addAllData(articleList)
        }
    }

在IO线程通过articleDao从数据库取数据,一旦数据返回,在主线程进行处理。如果在取数据的过程中ViewModel已经清除了,那么数据获取也会停止,防止资源的浪费。

对于Lifecycle,提供了LifecycleScope,我们可以直接通过launch来创建Coroutine

    private fun coroutine() {
        lifecycleScope.launch {
            delay(2000)
            showToast("coroutine first")
            delay(2000)
            showToast("coroutine second")
        }
    }

因为Lifecycle是可以感知组件的生命周期的,所以一旦组件onDestroy了,相应的LifecycleScope.launch闭包中的调用也将取消停止。

RxJava与协程

异步就是同时进行一个以上彼此目的不同的任务。但是对于有前后依赖关系的任务,我们就需要利用异步中的回调机制处理。而异步回调机制又会导致代码结构过分耦合,遇到多重函数回调的嵌套耦合,也就是回调地狱,会导致代码难以维护。而解决回调地狱的方案就是链式调用结构。很自然想到使用RxJava来解决回调地狱,它确实可以很方便地解决上面的问题。

RxJava,准确来讲是 ReactiveX 在 Java 上的实现,是一种响应式程序框架,我们通过它提供的「Observable」的编程范式进行链式调用,可以很好地消除回调。但是RxJava中流的创建、转化与消费都需要使用到各种类和丰富的操作符,这加大了RxJava的学习成本,因为你无法保证团队里面每一个成员都能看懂它。

使用协程,同样可以像 Rx 那样有效地消除回调地狱,不过无论是设计理念,还是代码风格,两者是有很大区别的,协程在写法上和普通的顺序代码类似。在串行的执行中,虽然代码确实是顺序执行的,但其实是在不同的线程上顺序执行的。因为串行的执行中,执行是阻塞式的,主线程的阻塞会导致很严重的问题,所以所有的耗时操作不能在主线程中执行,所以就需要多线程并行来执行。在并行的执行中,异步回调其实就是代码的多线程顺序执行。 那能不能既按照顺序的方式编写代码,又可以让代码在不同的线程顺序执行,自动完成线程的切换工作呢?

那就是Kotlin协程。 Kotlin 的协程是一种无栈协程的实现,它的控制流转依靠对协程体本身编译生成的状态机的状态流转来实现,变量保存也是通过闭包语法来实现的。异步回调就是代码的多线程顺序执行,而Kotlin协程可以实现顺序编写异步代码,自动进行线程切换。

没法比较RxJava和协程哪个好,或者谁能取代谁,因为Rx和协程的设计思想本来就不同。