如何结束/关闭MutableSharedFlow?

问题描述

SharedFlow刚刚在1.4.0-M1协程中引入,它意在替换所有broadcastChannel实现(如design issue定义中所述)。

我有一个用例,其中我使用broadcastChannel表示传入的Web套接字框架,以便多个侦听器可以“订阅”这些框架。 当我移到SharedFlow时遇到的问题是,当我收到关闭帧或上游错误(我想通知所有订阅者该流)时,我无法“结束”流结束了。

当我想有效地“关闭SharedFlow时如何使所有订阅终止? 有没有办法区分正常关闭和异常关闭间的区别? (如渠道)

如果MutableSharedFlow不允许将流的结尾传达给订户,那么如果broadcastChannel被弃用/删除,该怎么办?

解决方法

SharedFlow文档描述了您的需求:

请注意,大多数终端运算符(例如Flow.toList)在应用于共享流时也不会完成,但是可以在共享流上使用流截断运算符(例如Flow.take和Flow.takeWhile)将其转换为完成操作一个。

SharedFlow不能像BroadcastChannel一样关闭,永远不能代表失败。如果需要,应明确实现所有错误和完成信号。

基本上,您将需要引入一个可以从共享流中发出的特殊对象,以指示该流已结束,在使用者端使用takeWhile可以使它们一直发出,直到接收到该特殊对象为止。 / p>

,

我认为一个可能的解决方案是创建一个布尔标志 ValueError: Shape must be rank 4 but is rank 3 for '{{node sequential/random_rotation/transform/ImageProjectiveTransformV3}} = ImageProjectiveTransformV3[dtype=DT_FLOAT,fill_mode="WRAP",interpolation="BILINEAR"](rescaling/add,sequential/random_rotation/rotation_matrix/concat,sequential/random_rotation/transform/strided_slice,sequential/random_rotation/transform/fill_value)' with input shapes: [299,299,3],[299,8],[2],[]. 并仅公开公开带有 isValid 的流。然后,当您想关闭所有订阅者时,只需调用 .takeWhile { isValid }isValid = false

可能的实现:

sFlow.emit()

编辑:在我的情况下,private var isValid = true // In real scenario use atomic boolean private val _sharedFlow = MutableSharedFlow<Unit>() val sharedFlow: Flow<Unit> get() = _sharedFlow.takeWhile { isValid } suspend fun cancelSharedFlow() { isValid = false _sharedFlow.emit(Unit) } 总是挂起,所以我不得不使用 .emit()(这不适合许多用例)。不确定问题是在这个例子中还是在我的应用程序的其他地方。如果您发现问题,请发表评论:)