Kotlin Flow啊,你将流向何方?
创始人
2024-02-06 17:16:09
0

前言

前边一系列的协程文章铺垫了很久,终于要分析Flow了。如果说协程是Kotlin的精华,那么Flow就是协程的精髓。
通过本篇文章,你将了解到:

  1. 什么是流?
  2. 为什么引进Flow?
  3. Fow常见的操作
  4. 为什么说Flow是冷流?

1. 什么是流

image.png
自然界的流水,从高到低,从上游到下游流动。

而对于计算机世界的流:

数据的传递过程构成了数据流,简称流

比如想要查找1~1000内的偶数,可以这么写:

    var i = 0var list = mutableListOf()while (i < 1000) {if (i % 2 == 0)list.add(i)i++}

此处对数据的处理即为找出其中的偶数。
若想要在偶数中找到>500的数,则继续筛选:

    var i = 0var list = mutableListOf()while (i < 1000) {if (i > 500 && i % 2 == 0)list.add(i)i++}

可以看出,原始数据是1~1000,我们对它进行了一些操作:过滤偶数、过滤>500的数。当然还可以进行其它操作,如映射、变换等。
提取上述过程三要素:

  1. 原始数据
  2. 对数据的一系列操作
  3. 最终的数据

把这一系列的过程当做流:
image.png

从流的方向来观察,我们称原始数据为上流,对数据进行一系列处理后,最终的数据为下流。
从流的属性来观察,我们认为生产者在上流生产数据,消费者在下流消费数据。

2. 为什么引进Flow?

由前面的文章我们知道,Java8提供了StreamAPI,专用来操作流,而Kotlin也提供了Sequence来处理流。
那为什么还要引进Flow呢?
在Kotlin的世界里当然不会想再依赖Java的StreamAPI了,主要来对比Kotlin里的各种方案选择。
先看应用场景的演变。

a、集合获取多个值
想要获取多个值,很显而易见的想到了集合。

    fun testList() {//构造集合fun list(): List = listOf(1, 2, 3)list().forEach {//获取多个值println("value = $it")}}

以上函数功能涉及两个对象:生产者和消费者。
生产者:负责将1、2、3构造为集合。
消费者:负责从集合里将1、2、3取出。
若此时想要控制生产者的速度,比如先将1放到集合里,过1秒后再讲2放进集合,在此种场景下该函数显得不那么灵活了。

b、Sequence控制生成速度
Sequence可以生产数据,先看看它是怎么控制生产速度的。

    fun testSequence() {fun sequence():Sequence = sequence {for (i in 1..3) {Thread.sleep(1000)yield(i)}}sequence().forEach {println("value = $it")}}

通过阻塞线程控制了生产者的速度。
你可能会说:在协程体里为啥要用Thread.sleep()阻塞线程呢,用delay()不香吗?
看起来很香,我们来看看实际效果:
image.png
直接报编译错误了,提示是:受限制的挂起函数只能调用自己协程作用域内的成员和其它挂起函数。
而sequence的作用域是SequenceScope,查看其定义发现:
image.png
究其原因,SequenceScope 被RestrictsSuspension 修饰限制了。

c、集合配合协程使用
sequence 因为协程作用域的限制,不能异步生产数据,而使用集合却没此限制。

    suspend fun testListDelay() {suspend fun list():List {delay(1000)return listOf(1, 2, 3)}list().forEach {println("value = $it")}}

但也暴露了一个缺陷,只能一次性的返回集合元素。

综上所述:

不管是集合还是Sequence,都不能完全覆盖流的需求,此时Flow闪亮登场了

3. Fow常见的操作

最简单的Flow使用

    suspend fun testFlow1() {//生产者var flow = flow {//发射数据emit(5)}//消费者flow.collect {println("value=$it")}}

通过flow函数构造一个flow对象,然后通过调用flow.collect收集数据。
flow函数的闭包为生产者的生产逻辑,collect函数的闭包为消费者的消费逻辑。

当然,还有更简单的写法:

    suspend fun testFlow2() {//生产者flow {//发射数据emit(5)}.collect {//消费者println("value=$it")}}

执行流程:
image.png

Flow操作符

上面只提到了flow数据的发送以及接收,并没有提及对flow数据的操作。
flow提供了许多操作符方便我们对数据进行处理(对流进行加工)。
我们以寻找1~1000内大于500的偶数为例:

    suspend fun testFlow3() {//生产者var flow = flow {for (i in 1..1000) {emit(i)}}.filter { it > 500 && it % 2 == 0 }//消费者flow.collect {println("value=$it")}}

filter函数的作用根据一定的规则过滤数据,一般称这种函数为flow的操作符。
当然还可以对flow进行映射、变换、异常处理等。

    suspend fun testFlow3() {//生产者var flow = flow {for (i in 1..1000) {emit(i)}}.filter { it > 500 && it % 2 == 0 }.map { it - 500 }.catch {//异常处理}//消费者flow.collect {println("value=$it")}}

中间操作符
前面说过流的三要素:原始数据、对数据的操作、最终数据,对应到Flow上也是一样的。
flow的闭包里我们看做是原始数据,而filter、map、catch等看做是对数据的操作,collect闭包里看做是最终的数据。
filter、map等操作符属于中间操作符,它们负责对数据进行处理。

中间操作符仅仅只是预先定义一些对流的操作方式,并不会主动触发动作执行

末端操作符
末端操作符也叫做终端操作符,调用末端操作符后,Flow将从上流发出数据,经过一些列中间操作符处理后,最后流到下流形成最终数据。
如上面的collect操作符就是其中一种末端操作符。

怎么区分中间操作符和末端操作符呢?
和Sequence操作符类似,可以通过返回值判断。
先看看中间操作符filter:

public inline fun  Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value ->if (predicate(value)) return@transform emit(value)
}internal inline fun  Flow.unsafeTransform(@BuilderInference crossinline transform: suspend FlowCollector.(value: T) -> Unit
): Flow = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal usecollect { value ->// kludge, without it Unit will be returned and TCE won't kick in, KT-28938return@collect transform(value)}
}

可以看出,filter操作符仅仅只是构造了Flow对象,并重写了collect函数。

再看末端操作符collect:

public suspend inline fun  Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =collect(object : FlowCollector {override suspend fun emit(value: T) = action(value)})

返回值为Unit,并且通过调用collect最终调用了emit,触发了流。

Flow相比Sequence、Collection的优势

Sequence对于协程的支持不够好,不能调用其作用域外的suspend函数,而Collection生产数据不够灵活,来看看Flow是如何解决这些问题的。

    suspend fun testFlow4() {//生产者var flow = flow {for (i in 1..1000) {delay(1000)emit(i)}}.flowOn(Dispatchers.IO)//切换到io线程执行//消费者flow.collect {delay(1000)println("value=$it")}}

如上,flow的生产者、消费者闭包里都支持调用协程的suspend函数,同时也支持切换线程执行。
再者,flow可以将集合里的值一个个发出,可调整其流速。
当然,flow还提供了许多操作符帮助我们实现各种各样的功能,此处限于篇幅就不再深入。
万变不离其宗,知道了原理,一切迎刃而解。

4. 为什么说Flow是冷流?

flow 的流动

在sequence的分析里有提到过sequence是冷流,那么什么是冷流呢?

没有消费者,生产者不会生产数据
没有观察者,被观察者不会发送数据

    suspend fun testFlow5() {//生产者var flow = flow {println("111")for (i in 1..1000) {emit(i)}}.filter {println("222")it > 500 && it % 2 == 0}.map {println("333")it - 500}.catch {println("444")//异常处理}

如上代码,只要生产者没有消费者,该函数运行后不会有任何打印语句输出。
这个时候将消费者加上,就会触发流的流动。

还是以最简单的flow demo为例,看看其调用流程:
image.png

图上1~6步骤即为最简单的flow调用流程。
可以看出,只有调用了末端操作符(如collect)之后才会触发flow的流动,因此flow是冷流。

flow 的原理

    suspend fun testFlow1() {//生产者var flow = flow {//发射数据emit(5)}//消费者flow.collect {println("value=$it")}}

以上代码涉及到三个关键函数(flow、emit、collect),两个闭包(flow闭包、collect闭包。
从上面的调用图可知,以上五者的调用关系:

flow–>collect–>flow闭包–>emit–>collect闭包

接下来逐一分析在代码里的关系。

先看生产者动作(flow函数)
flow函数实现:


public fun  flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)

传入的参数类型为:FlowCollector的扩展函数,而FlowCollector是接口,它有唯一的函数:emit(xx)。因此在flow函数的闭包里可以调用emit(xx)函数,flow闭包作为SafeFlow的成员变量block。
flow 函数返回SafeFlow,SafeFlow继承自AbstractFlow,并实现了collect函数:

#Flow.ktpublic final override suspend fun collect(SafeCollector: FlowCollector) {//构造SafeCollector//collector 作为SafeCollector的成员变量val safeCollector = SafeCollector(collector, coroutineContext)try {//抽象函数,子类实现collectSafely(safeCollector)} finally {safeCollector.releaseIntercepted()}}

collect的闭包作为SafeCollector的成员变量collector,后面会用到。
由此可见:flow函数仅仅只是构造了flow对象并返回。

再看消费者动作(collect)
当消费者调用flow.collect函数时:

public suspend inline fun  Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =collect(object : FlowCollector {override suspend fun emit(value: T) = action(value)})

此时调用的collect即为flow里定义的collect函数,并构造了匿名对象FlowCollector,实现了emit函数,而emit函数的真正实现为action,也就是外层传入的collect的闭包。

上面分析到的collect源码里调用了collectSafely:

    private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() {override suspend fun collectSafely(collector: FlowCollector) {collector.block()}}

此处的block即为在构造flow对象时传入的闭包。
此时,消费者通过collect函数已经调用到生产者的闭包里

还剩下最后一个问题:生产者的闭包是如何流转到消费者的闭包里呢?

最后看发射动作(emit)
在生产者的闭包里调用了emit函数:

    override suspend fun emit(value: T) {//挂起函数return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->try {//uCont为当前协程续体emit(uCont, value)} catch (e: Throwable) {// Save the fact that exception from emit (or even check context) has been thrownlastEmissionContext = DownstreamExceptionElement(e)throw e}}}private fun emit(uCont: Continuation, value: T): Any? {val currentContext = uCont.contextcurrentContext.ensureActive()// This check is triggered once per flow on happy path.val previousContext = lastEmissionContextif (previousContext !== currentContext) {checkContext(currentContext, previousContext, value)}completion = uCont//collector.emit 最终调用collect的闭包return emitFun(collector as FlowCollector, value, this as Continuation)}

如此一来,生产者的闭包里调用emit函数后,将会调用到collect的闭包里,此时数据从flow的上游流转到下游。
总结以上步骤,其实本质还是对象调用。

中间操作符的原理
以filter为例:

    public inline fun  Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value ->//判断过滤条件是否满足,若是则发送数据if (predicate(value)) return@transform emit(value)}internal inline fun  Flow.unsafeTransform(@BuilderInference crossinline transform: suspend FlowCollector.(value: T) -> Unit): Flow = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use//调用当前对象collectcollect { value ->// kludge, without it Unit will be returned and TCE won't kick in, KT-28938return@collect transform(value)}}internal inline fun  unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow {//构造flow,重写collectreturn object : Flow {override suspend fun collect(collector: FlowCollector) {collector.block()}}}

filter操作符构造了新的flow对象,该对象重写了collect函数。
当调用flow.collect时,先调用到filter对象的collect,进而调用到原始flow的collect,接着调用到原始flow对象的闭包,在闭包里调用的emit即为filter的闭包,若filter闭包里条件满足则调动emit函数,最后调用到collect的闭包。
image.png

理解中间操作符的要点:

  1. 中间操作符返回新的flow对象,重写了collect函数
  2. collect函数会调用当前flow(调用filter的flow对象)的collect
  3. collect函数做其它的处理

与sequence类似,使用了装饰者模式。
以上以filter为例阐述了原理,其它中间操作符的原理类似,此处就不再细说。

下篇将分析Flow的背压与线程切换,相信分析的逻辑会让大家耳目一新,敬请期待~

本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 如何确定大小/onMeasure()多次执行原因
8、Android事件驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明了
11、Android Activity/Window/View 的background
12、Android Activity创建到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...