[TOC]

flowOn 操作符

该函数用于更改流发射的上下文。

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
log("Emitting $i")
emit(i) // 发射下一个值
}
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文的正确方式

fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flowOn 操作符已改变流的默认顺序性。 现在收集发生在一个协程中(“coroutine#1”)而发射发生在运行于另一个线程中与收集协程并发运行的另一个协程(“coroutine#2”)中。当上游流必须改变其上下文中的 CoroutineDispatcher 的时候,flowOn 操作符创建了另一个协程。

缓冲

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假装我们异步等待了 100 毫秒
emit(i) // 发射下一个值
}
}

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // 缓冲发射项,无需等待
.collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
}
1
2
3
Collected in 1054 ms

注意,当必须更改 CoroutineDispatcher 时,flowOn 操作符使用了相同的缓冲机制, 但是我们在这里显式地请求缓冲而不改变执行上下文。

合并

当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,而是只处理最新的那个。

当收集器处理它们太慢的时候, conflate 操作符可以用于跳过中间值。

val time = measureTimeMillis {
simple()
.conflate() // 合并发射项,不对每个值进行处理
.collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
1
3
Collected in 749 ms

处理最新值

val time = measureTimeMillis {
simple()
.collectLatest { value -> // 取消并重新发射最后一个值
println("Collecting $value")
delay(300) // 假装我们花费 300 毫秒来处理它
println("Done $value")
}
}
println("Collected in $time ms")

由于 collectLatest 的函数体需要花费 300 毫秒,但是新值每 100 秒发射一次,我们看到该代码块对每个值运行,但是只收集最后一个值:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 691 ms

Zip方法

val nums = (1..3).asFlow() // 数字 1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
.collect { println(it) } // 收集并打印
1 -> one
2 -> two
3 -> three

Combine

当流表示一个变量或操作的最新值时,可能需要执行计算,这依赖于相应流的最新值,并且每当上游流产生值的时候都需要重新计算。

val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1 -> one at 425 ms from start
2 -> two at 825 ms from start
3 -> three at 1227 ms from start
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" } // 使用“combine”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1 -> one at 443 ms from start
2 -> one at 644 ms from start
2 -> two at 845 ms from start
3 -> two at 944 ms from start
3 -> three at 1245 ms from start

flatMapConcat

连接模式由 flatMapConcat 与 flattenConcat 操作符实现。

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1: First at 127 ms from start
1: Second at 628 ms from start
2: First at 729 ms from start
2: Second at 1229 ms from start
3: First at 1329 ms from start
3: Second at 1830 ms from start

flatMapMerge

另一种展平模式是并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发射值。 它由 flatMapMerge 与 flattenMerge 操作符实现。

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

flatMapLatest

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start