RxJava3 - 如何在使用共享运算符时将 doFinally 与线程安全操作一起使用时避免死锁?

问题描述

我正在尝试根据 id 将同一个 Flowable 共享给多个订阅者,而每个订阅者都可以在需要时取消订阅以取消订阅。如果某个 id 的所有订阅者都取消订阅,然后另一个订阅者尝试订阅该 id,则应该创建一个新订阅。下面的代码是用 Kotlin 编写的,试图实现这个功能。

class SharedFlowProvider() {
    private val flowProvider = FlowProvider()
    private val eventFlowById = HashMap<String,Flowable<ComputedProperties>>()

    @Synchronized
    override fun subscribeToProperties(subscriber: CancellableSubscriber<ComputedProperties>,id: String){
        eventFlowById.computeIfAbsent(id) { buildFlowable(id) }
                .subscribe(orderBasedSubscriber)
    }

    private fun buildFlowable(id: String) = 
        flowProvider.getFlowable(id)
            .doFinally { removeSubscription(id) }
            .share()

    @Synchronized
    private fun removeSubscription(id: String) {
        eventFlowById.remove(id)
    }
}

当在同一 ID 上取消后跟订阅时会出现问题。 由于共享返回的 FlowableRefCount 的工作方式,我发现这种方法可能存在死锁。在底层实现中,FlowableRefCount 在其 subscribeActual 和 cancel 方法中使用同步机制,虽然乍一看并不明显,但 doFinally 方法的操作在该锁定机制内运行。这有点违反直觉,因为在 doFinally 文档中它说:

请注意 onFinally 操作在订阅之间共享,因此应该是线程安全的。

因此,出现了以下场景:

  • 通过取消锁定 FlowableRefCount 实例
  • subscribeToProperties 使用与取消中相同的 ID 调用
  • subscribeToProperties 在调用 subscribe(然后 subscribeActual)时被阻塞,因为已经通过取消在 FlowableRefCount 上获取了锁
  • 调用 removeOrderSubscription 被阻止,因为 SharedFlowProvider 上的锁定已经通过 subscribeToProperties 方法获取

我还尝试通过以下方式使用 ConcurrentHashMap 从 subscribeToProperties 方法中分离锁定:

private val eventFlowById = ConcurrentHashMap<String,Flowable<ComputedProperties>>()

override fun subscribeToProperties(subscriber: CancellableSubscriber<ComputedProperties>,id: String){
    val flow = eventFlowById.computeIfAbsent(id) { buildFlowable(id) }
    flow.subscribe(orderBasedSubscriber)
    //the flow associated with an id could be removed by a cancel done before the subscribe call,so we need to make sure it is added to the map
    eventFlowById.computeIfAbsent(id) { flow }
}

但是在这种方法中,如果我们在方法的最后一行之前进行了订阅,然后快速取消,我们最终可能会将流放入地图中,即使我们没有订阅者.

关于如何在没有死锁或竞争条件的情况下实现此功能的一些想法,我将不胜感激。

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...