如何创建一个联合的可观察对象,使之耗尽,但总是在电流结束后触发最后一个事件?

问题描述

我要解决以下理论问题,请注意,对于我描述的示例问题,可能会有更通用的解决方案,但是我特别想知道如何创建具有这些属性的联合可观察对象。

我有一个可观察到的更改事件,这些事件应该触发保存操作(这可以观察到保存成功)。

  1. 我需要确保最后最后一个保存事件一定会执行
  2. 保存本身是一个复杂的过程,需要一些时间,并且在保存事件期间,不应执行其他保存操作

使用 exhaust 或exhaustMap几乎可以满足我的要求:它可以确保在保存过程中不会触发其他事件。 虽然 concat 或concatMap可以确保执行最后一个操作,但是我会做很多不必要的保存操作。

重新描述一下:如何创建一个观察对象,将耗尽并保持最后一个事件?

解决方法

您可以将throttle与配置leading: true,trailing: true一起使用来发出第一个事件,然后再发出任何事件,直到发出可观察到的事件,然后再发出在此期间收到的最后一个事件。参见How does throttleTime operator's config parameter work? (ThrottleConfig)

映射到之后要执行的可观察对象(保存操作)。保存操作完成后,使用Subjectfinalize结束调节间隔。

是否使用mergeMapexhaustMapconcatMap等映射到您的内部可观察对象都没关系,因为throttle运算符仅在您操作时发出下一个事件内部可观察完成。

如果使用此逻辑创建自定义运算符函数,则必须用defer包装代码,以便不同的订户不会共享相同的主题,而是每个订户都有自己的新主题。

export function exhaustMapWithTrailing<T,R>(
  project: (value: T,index: number) => ObservableInput<R>
): OperatorFunction<T,R> {
  return (source): Observable<R> => defer(() => {
    const release = new Subject()

    return source.pipe(
      throttle(() => release,{ leading: true,trailing: true }),exhaustMap((value,index) => from(project(value,index)).pipe(
        finalize(() => release.next())
      ) as Observable<R> )
    )
  })
}
events$.pipe(
  exhaustMapWithTrailing(event => save(event))
)

https://stackblitz.com/edit/rxjs-5k6egc?file=index.ts

此代码改编自https://github.com/ReactiveX/rxjs/issues/5004

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...