[TOC]

异步流

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
simple().forEach { value -> println(value) }
}
fun simple(): Sequence<Int> = sequence { // 序列构建器
for (i in 1..3) {
Thread.sleep(100) // 假装我们正在计算
yield(i) // 产生下一个值
}
}

fun main() {
simple().forEach { value -> println(value) }
}

挂起函数

suspend fun simple(): List<Int> {
delay(1000) // 假装我们在这里做了一些异步的事情
return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}

流使用

fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}

fun main() = runBlocking<Unit> {
// 启动并发的协程以验证主线程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集这个流
simple().collect { value -> println(value) }
}

注意使用Flow类型构建器函数:

  • 名为 flow的 Flow类型构建器函数。
  • flow { ... } 构建块中的代码可以挂起。
  • 函数 simple 不再标有 suspend 修饰符。
  • 流使用 emit函数 发射 值。
  • 流使用 collect 函数 收集 值。

我们可以在 simpleflow { ... } 函数体内使用 Thread.sleep 代替 delay 以观察主线程在本案例中被阻塞了。

flow 构建器中的代码直到流被收集的时候才运行

流的超时取消

fun simple(): Flow<Int> = flow { 
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
simple().collect { value -> println(value) }
}
println("Done")
}

流构建器

flow { ... } 构建器是最基础的一个

flowOf 构建器定义了一个发射固定值集的流。
使用 .asFlow() 扩展函数,可以将各种集合与序列转换为流。

// 将一个整数区间转化为流
(1..3).asFlow().collect { value -> println(value) }

操作符

过渡流操作符

suspend fun performRequest(request: Int): String {
delay(1000) // 模仿长时间运行的异步工作
return "response $request"
}

fun main() = runBlocking<Unit> {
(1..3).asFlow() // 一个请求流
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
response 1
response 2
response 3

转换操作符

(1..3).asFlow() // 一个请求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

限长操作符

fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}

fun main() = runBlocking<Unit> {
numbers()
.take(2) // 只获取前两个
.collect { value -> println(value) }
}

末端流操作符

末端操作符:

  • 转化为各种集合,例如 toList 与 toSet。
  • 获取第一个(first)值与确保流发射单个(single)值的操作符。
  • 使用 reduce 与 fold 将流规约到单个值。
val sum = (1..5).asFlow()
.map { it * it } // 数字 1 至 5 的平方
.reduce { a, b -> a + b } // 求和(末端操作符)
println(sum)

输出: 55

例子

过滤偶数并将其映射到字符串

(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

通常,withContext 用于在 Kotlin 协程中改变代码的上下文,但是 flow {…} 构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。