[TOC]

协程

kotlinx.coroutines 是由 JetBrains 开发的功能丰富的协程库。它涵盖了的很多启用高级协程的原语,包括 launchasync 等等。

本质上,协程是轻量级的线程。 它们在某些 CoroutineScope上下文中与 launch 协程构建器 一起启动。

当我们在 GlobalScope 中启动了一个新的协程,这意味着新协程的生命周期只受整个应用程序的生命周期限制。

delay是一个特殊的 挂起函数 ,它不会造成线程阻塞,但是会 挂起 协程,并且只能在协程中使用。

基础

阻塞与非阻塞

  • 非阻塞的 delay(……)
  • 阻塞的 Thread.sleep(……)
  • 阻塞的runBlocking{..delay(….)..}

注意: runBlocking会阻塞主线程,直到内部执行完成

例子

fun main() = runBlocking<Unit> { // 开始执行主协程
GlobalScope.launch { // 在后台启动一个新的协程并继续,这与局部启动(launch {})不同
delay(1000L)
println("World!")
}
println("Hello,") // 主协程在这里会立即执行
delay(2000L) // 延迟 2 秒来保证 JVM 存活,或者使用 job.join()
}

挂起函数单元测试

class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// 这里我们可以使用任何喜欢的断言风格来使用挂起函数
}
}

作用域构建器

  • runBlocking 方法会阻塞当前线程来等待;
  • coroutineScope 只是挂起,会释放底层线程用于其他用途。

runBlocking 是常规函数,而 coroutineScope 是挂起函数。

fun main() = runBlocking { // this: CoroutineScope
launch {
delay(200L)
println("Task from runBlocking")
}

coroutineScope { // 创建一个协程作用域
launch {
delay(500L)
println("Task from nested launch")
}

delay(100L)
println("Task from coroutine scope") // 这一行会在内嵌 launch 之前输出
}

println("Coroutine scope is over") // 这一行在内嵌 launch 执行完毕后才输出
}
Task from coroutine scope
Task from runBlocking
Task from nested launch
Coroutine scope is over

提取函数重构

fun main() = runBlocking {
launch { doWorld() }
println("Hello,")
}

// 这是你的第一个挂起函数
suspend fun doWorld() {
delay(1000L)
println("World!")
}
Hello,
World!

注意:在 GlobalScope 中启动的活动协程并不会使进程保活。它们就像守护线程。

取消与超时

fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消该作业
job.join() // 等待作业执行结束;可以用cancelAndJoin 取代上面两行
println("main: Now I can quit.")
}
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的

有两种方法来使执行计算的代码可以被取消。第一种方法是定期调用挂起函数来检查取消。对于这种目的 yield 是一个好的选择。 另一种方法是显式的检查取消状态。

isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。

运行不能取消的代码块

所有良好的关闭操作(关闭一个文件、取消一个作业、或是关闭任何一种通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。

当你需要挂起一个被取消的协程,你可以将相应的代码包装在 withContext(NonCancellable) {……} 中,并使用 withContext 函数以及 NonCancellable 上下文

val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并等待它结束
println("main: Now I can quit.")
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.

超时

例子1

val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // 在它运行得到结果之前取消它
}
println("Result is $result")
I'm sleeping 0 ...
I'm sleeping 1 ...
Result is null

例子2

import kotlinx.coroutines.*

var acquired = 0

class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}

fun main() {
runBlocking {
repeat(100_000) { // Launch 100K coroutines
launch {
var resource: Resource? = null // Not acquired yet
try {
withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
resource = Resource() // Store a resource to the variable if acquired
}
// We can do something else with the resource here
} finally {
resource?.close() // Release the resource if it was acquired
}
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}

注意:该例子是安全的,因为都在同一个线程中。

async 并发

在概念上,async 就类似于 launch。它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发的工作。不同之处在于 launch 返回一个 Job 并且不附带任何结果值,而 async 返回一个 Deferred —— 一个轻量级的非阻塞 future, 这代表了一个将会在稍后提供结果的 promise。你可以使用 .await() 在一个延期的值上得到它的最终结果, 但是 Deferred 也是一个 Job,所以如果需要的话,你可以取消它。

请注意,使用协程进行并发总是显式的。

async 可以通过将 start 参数设置为 CoroutineStart.LAZY 而变为惰性的。 在这个模式下,只有结果通过 await 获取的时候协程才会启动,或者在 Job 的 start 函数调用的时候。

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// 执行一些计算
one.start() // 启动第一个
two.start() // 启动第二个
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 假设我们在这里做了些有用的事
return 13
}

suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 假设我们在这里也做了些有用的事
return 29
}

协程上下文与调度器

所有的协程构建器诸如 launch 和 async 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。

newSingleThreadContext 为协程的运行启动了一个线程。 一个专用的线程是一种非常昂贵的资源。 在真实的应用程序中两者都必须被释放,当不再需要的时候,使用 close 函数,或存储在一个顶层变量中使它在整个应用程序中被重用。

launch(Dispatchers.Default) { …… }GlobalScope.launch { …… } 使用相同的调度器。

launch { // 运行在父协程的上下文中,即 runBlocking 主协程
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 将会获取默认调度器
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
Unconfined            : I'm working in thread main @coroutine#3
Default : I'm working in thread DefaultDispatcher-worker-1 @coroutine#4
main runBlocking : I'm working in thread main @coroutine#2
newSingleThreadContext: I'm working in thread MyOwnThread @coroutine#5

非受限调度器 vs 受限调度器

Dispatchers.Unconfined 协程调度器在调用它的线程启动了一个协程,但它仅仅只是运行到第一个挂起点。挂起后,它恢复线程中的协程,而这完全由被调用的挂起函数来决定。非受限的调度器非常适用于执行不消耗 CPU 时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。