RxJS 过滤/分组去抖动 操作员使用 partitionOn 进行优先去抖动priorityDebounceTime 运算符使用 priorityDebounceTime 进行优先去抖动

问题描述

我正在尝试使用 RxJS 去抖动操作符,但我想自定义源的发射何时去抖动。

认情况下,来自去抖动窗口内源的任何发射将导致之前的发射被丢弃。根据源发射的值,我希望仅某些来自源的排放计入去抖动操作。

假设我有一个看起来像这样的对象:

{
  priority: 'low'    //can be 'low' or 'medium' or 'high
}

我希望去抖动按对象的优先级分组。这意味着只有当一个发射具有相同的优先级时,它才会被另一个发射去抖动。

即只有 'low' 排放可以消除 'low' 排放,并且只有 'high' 排放可以消除 'high' 排放。如果 'medium' 发射在 'low' 发射等待时到来,它不会导致 'low' 发射被丢弃。

这意味着如果我快速连续发射 'low''medium',两者都会通过。如果我快速连续发射两次 'low',那么只有最后一次会通过。

这是我想出的:

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000

$source.pipe(
    mergeMap(value => {
       
        // We start a race of the value with a delay versus any other emissions from the source with the same priority
        return race(
            timer(delay).pipe(mapTo(value)),$source.pipe(
                filter(v => v.priority === value.priority),)
        ).pipe(
            take(1),// If another emission with the same priority comes before the delay,the second racer it will win the race.
            // If no emission with the same priority comes,the first racer will win.
            //
            // If the first racer wins,this equality check is satisfied and the value is passed through.
            // If the second racer wins,the equality check fails and no value is emitted. Since this is a mergeMap,this whole process will start again for that emission.
            filter(v => v === value),)
    })
)

认为以上是正确的,但我想知道我是否遗漏了什么或使这种方式比需要的更复杂?上面的代码应该像合并 $low.pipe(debounceTime(delay)) $medium.pipe(debounceTime(delay))$high.pipe(debounceTime(delay)) 的三个独立流一样工作。

谢谢!!

解决方法

我认为您的回答有效。这也很清楚。但是,您必须确保您的 $source 是多播的。

我认为您的方法有一个缺点:

你做了很多额外的计算。如果您每秒对 1000 个值进行去抖动,则根据运行位置的不同,它可能会明显变慢。

每个流值可以在任意数量的比赛中。来自不同优先级的输入仍然相互竞争,当下一个值开始竞争时,前一个竞争不会停止,因此如果一次到达很多值,您可能会遇到计时器/竞争的爆炸式增长。

设置和删除了很多额外的计时器。在您的情况下,您应该最多需要三个计时器,每个计时器都会在相同优先级的新值到达时重置。

如果您的代码不在关键路径上,那可能不是问题。否则,还有其他方法。不过,我想到的那个在代码方面有点笨重。

对流进行分区

这是我的大脑如何解决这个问题的。我创建了一个操作符,它执行 RxJS partition 操作符的功能,但允许您划分为两个以上的流。

我的方法在内部处理多播,因此源可以是任何内容(热、冷、多播或非多播)。它(内部)为每个流设置一个主题,然后您可以像往常一样使用 RxJS 的 debounceTime。

不过有一个缺点。在您的方法中,您可以随意添加一个新的优先级字符串,它应该会继续工作。 {priority: "DucksSayQuack"} 的对象将相互去抖动而不影响其他优先级。这甚至可以即时完成。

下面的 partitionOn 操作符需要提前知道分区。对于您所描述的情况,它应该具有相同的输出并且启动效率更高。

这样更好吗?我不知道,这是解决同一问题的有趣且不同的方法。此外,我认为 partitionOn 运算符的用途比分区去抖动更多。

操作员

/***
 * Create a partitioned stream for each value where a passed 
 * predicate returns true
 ***/
function partitionOn<T>(
  input$: Observable<T>,predicates: ((v:T) => boolean)[]
): Observable<T>[] {
  const partitions = predicates.map(predicate => ({
    predicate,stream: new Subject<T>()
  }));

  input$.subscribe({
    next: (v:T) => partitions.forEach(prt => {
      if(prt.predicate(v)){
        prt.stream.next(v);
      } 
    }),complete: () => partitions.forEach(prt => prt.stream.complete()),error: err => partitions.forEach(prt => prt.stream.error(err))
  });

  return partitions.map(prt => prt.stream.asObservable());
}

使用 partitionOn 进行优先去抖动

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;

const priorityEquals = a => b => a === b?.priority;

merge(
  ...partitionOn(
    $source,[priorityEquals('low'),priorityEquals('medium'),priorityEquals('high')]
  ).map(s => s.pipe(
    debounceTime(1000)
  ))
);

为您的流添加时间戳

此方法与您的方法非常相似,可让您再次使用优先级字符串。这有一个类似的问题,即每个值都被扔进一个计时器,并且计时器不会在新值到达时取消。

然而,通过这种方法,取消不必要的计时器的途径更加清晰。您可以在 priorityTimeStamp 映射中将订阅对象与时间戳一起存储,并确保在新值到达时取消订阅。

我真的不知道这会对性能造成什么影响,我认为 JavaScript 的事件循环非常健壮/高效。这种方法的好处是您无需支付多播成本。这实际上只是一个流,使用查找图来决定哪些内容被过滤,哪些内容没有。

priorityDebounceTime 运算符

function priorityDebounceTime<T>(
  dbTime: number,priorityStr = "priority"
): MonoTypeOperatorFunction<T> {

  return s => defer(() => {
    const priorityTimeStamp = new Map<string,number>();
    return s.pipe(
      mergeMap(v => {
        priorityTimeStamp.set(v[priorityStr],Date.now());
        return timer(dbTime).pipe(
          timestamp(),filter(({timestamp}) => 
            timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
          ),mapTo(v)
        )
      })
    )
  });

}

使用 priorityDebounceTime 进行优先去抖动

这显然要简单一些:

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;

$source.pipe(
  priorityDebounceTime(delay)
).subscribe(console.log);