以线程安全的方式运行组合链

问题描述

我有一个使用Operation子类和OperationQueue实现的现有流程。我正在尝试将合并作为学习练习重新实现。

简化后的当前版本如下:

func testFunction(completionHandler: (Result) -> Void) {
    let op = LongRunningOperation(object: self.stateObject)

    op.completionHandler = { [uNowned op] in
        let result = op.result
        self.stateObject.update(with: result)

        completionHandler(result)
    }

    localSerialQueue.addOperation(op)
}

一些重要方面。 LongRunningOperation是异步的。根据{{​​1}},可能需要调出主队列以完成登录过程。仅在stateObject上访问stateObject。另外,为了明确起见,还可能有对localSerialQueue的其他并发调用,因此由队列+操作提供的序列化是必不可少的。

可变状态潜在的并发调用的组合对于该问题绝对至关重要。

为了说明这个问题,我制作了一个示例,证明,该合并管道不是原子执行的。任何其他结果都会令我感到惊讶,但我只是想验证一下。多个线程可以同时执行管道的各个阶段,这违反了我们的前提条件检查。

runoperation

非常清楚,我的问题不是关于如何建立一个合并管道。它涉及Combine同步原语的语义,以及如何保护管道中访问的可变状态。而且,可能的解决方案是将Combine与另一种形式的同步(如锁)结合使用来处理这种访问模式。没关系!我只是好奇是否其他人遇到了这种问题,是否有更惯用的方式,或者我是否可以以某种方式重新构造问题。

解决方法

Combine的全部要点是它一步一步地排列了步骤。直到上一步已发出信号 it 已执行(通过将值沿管道传递)后,才能执行一个步骤。因此,仅使用Combine,就可以使事物“原子化”。这就是重点。

这与队列无关。您可以指定一个调度队列,也可以在管道过程中切换调度队列,但这并不重要。切换队列只是流水线中的又一步。

您显示的代码几乎是胡说八道,其中大部分是不必要的。您不会在另一个发布商中间打电话。您不要在管道中间调用store。您通常不需要致电subscribe。除非您要切换线程(实际上是调度队列),否则您不会调用receive(on:)

因此,您构建了一个管道。首先是发布者,然后是一系列的运营商。然后,在两端,有一个订户(接收器或分配器)和一个store(in:)使它全部保持活动,就是这样。

有些运算符用于描述您可能想到的每种情况和拓扑。特别是(由于这是您似乎最困惑的部分),根据目标是什么,在管道中间有多种处理异步的方法。以下是其中一些:

  • 如果在管道的中间,仅在整个上游管道产生值之后,您需要另一个发布者才能启动,则可以使用flatMap运算符使该发布者存在,以响应收到该值。

  • 或者,如果一个发布者只应在另一个发布者结束后才开始,但是您 不需要从第一者传递到第二者,请使用{{1 }}运算符。

  • 或者,如果两个发布者可以独立运作,但在两个发布者都发布之前不能继续进行操作,请使用append运算符。

以此类推;有很多很多类似的东西。