文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Kotlin 协程异步热数据流的设计与使用讲解

2024-04-02 19:55

关注

一.异步冷数据流

在Kotlin协程:协程的基础与使用中,通过使用协程中提供的flow方法可以创建一个Flow对象。这种方法得到的Flow对象实际上是一个异步冷数据流,代码如下:

private suspend fun test() {
    val flow = flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }
    GlobalScope.launch {
        // 触发flow执行
        flow.collect {
            Log.d("liduo", "test1: $it")
        }
    }
    GlobalScope.launch {
        // 再次触发flow执行
        flow.collect {
            Log.d("liduo", "test2: $it")
        }
    }
}

在上面的代码中,通过调用flow方法,构建了一个名为flow对象,并对flow对象异步执行了两次。每次都会打印出1、2、3、4,然后结束执行。无论谁在前谁在后,无论执行多少次,得到的结果都是相同的,这就是异步冷数据流的一个特点。

二.异步热数据流

既然有冷数据流,那就一定有热数据流。在协程中提供了MutableSharedFlow方法来创建异步热数据流。相比于异步冷数据流,异步热数据流一般在类似广播订阅的场景中使用。

1.异步热数据流的设计

在异步热数据流中,核心接口的继承关系如下图所示:

1)SharedFlow接口

SharedFlow接口继承自Flow接口,代码如下:

public interface SharedFlow<out T> : Flow<T> {
    // 用于保存最近的已经发送的数据
    public val replayCache: List<T>
}

2)MutableSharedFlow接口

MutableSharedFlow接口继承自SharedFlow接口与FlowCollector接口,并在此基础上定义了两个方法与一个常量,代码如下:

public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    // 该方法用于尝试发射一个数据,
    // 当返回true时表示发射成功,返回false时,表示缓存空间不足,需要挂起。
    public fun tryEmit(value: T): Boolean
    // 该常量表示当前SharedFlow的订阅者的数量,
    // 该常量是一个状态流StateFlow,也是一个热流,当其中数值发生变化时会进行回调通知
    public val subscriptionCount: StateFlow<Int>
    // 用于清空replayCache
    // 在调用该方法之前老的订阅者,可以继续收到replaycache中的缓存数据,
    // 在调用该方法之后的新的订阅者,只能收到emit方法发射的新数据
    @ExperimentalCoroutinesApi
    public fun resetReplayCache()
}

2.异步热数据流的使用

1)MutableSharedFlow方法

在协程中,可以通过调用MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    ...
}

其中构造方法中三个参数的含义如下:

当创建MutableSharedFlow类型的对象时,可以通过参数replay确定SharedFlow接口中定义的replayCache的最大容量,通过参数extraBufferCapacity设置一个不包括replay大小的缓存数量。replayCache本质上也是缓存的一部分,因此extraBufferCapacity与replay共同决定了缓存的大小。

对于处理数据慢的订阅者,可以通过从缓存中获取数据,以此来避免发射者的挂起。缓存的数量大小决定了数据处理快的订阅者与数据处理慢的订阅者之间的延迟程度。

当使用默认的构造方法创建MutableSharedFlow类型的对象时,它的缓存数量为0。当调用它的emit方法时会直接挂起,直到所有的订阅者都处理完当前emit方法发送的数据,才会恢复emit方法的挂起。如果MutableSharedFlow类型的对象没有订阅者,则调用emit方法会直接返回。

2)使用示例

代码如下:

private suspend fun test() {
    // 创建一个热流
    val flow = MutableSharedFlow<Int>(2, 3, BufferOverflow.SUSPEND)
    // 启动一个协程,发射数据:1
    // 由于有缓存,因此会被添加到缓存中,不会挂起
    GlobalScope.launch {
        flow.emit(1)
    }
    // 将MutableSharedFlow对象转换为SharedFlow对象
    // SharedFlow对象不能调用emit方法,因此只能用于接收
    val onlyReadFlow = flow.asSharedFlow()
    // 接收者1
    // 启动一个新协程
    GlobalScope.launch {
        // 订阅监听,当collect方法触发订阅时,会首先会调onSubscription方法
        onlyReadFlow.onSubscription {
            Log.d("liduozuishuai", "test0: ")
            // 发射数据:3
            // 向下游发射数据:3,其他接收者收不到
            emit(3)
        }.onEach {
            // 处理接收的数据
            Log.d("liduozuishuai", "test1: $it")
        }.collect()
    }
    // 接收者2
    // 启动一个新的协程
    GlobalScope.launch {
        // 触发并处理接收的数据
        onlyReadFlow.collect {
            Log.d("liduozuishuai", "test2: $it")
        }
    }
    // 发送数据:2
    GlobalScope.launch {
        flow.emit(2)
    }
}

对于上面的代码,接收者1会依次打印出:3、1、2,接收者2会依次打印出1、2。

以上就是Koltin 协程异步热数据流的设计与使用讲解的详细内容,更多关于Koltin 协程异步热数据流的资料请关注编程网其它相关文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-移动开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯