如何在 takeUntil rxjs 函数之后采取行动

问题描述

因此,如果运行的函数运行时间足够长以触发在计时器上运行的takenUntil 函数,我试图让布尔值为真。

这是代码

start = this.http.get(environment.shochat_content_creator_set_valid_stream_start).pipe(
    tap(() => console.log('Stream start'))
  );

  poll = this.http.get(environment.check_if_stream_is_active_on_mux).pipe(
    tap(() => {
        this.streamready = true;
        return 0;
      }
      ),catchError(error => {
      console.log(error);
      return EMPTY;
    })
  );

  startastream(){
    const endtimer = timer(60000);
    this.streampollsubscription = this.start.pipe(
      switchMap(() => timer(0,5000).pipe(
        tap(() => console.log('Polling every 5s')),mergeMap(() => this.poll)
      )),takeuntil(endtimer)
    ).subscribe();

  }

本质上,如果 takeuntil 确实被触发,我希望将布尔值设置为 true。

timeout = true;

我一直在看这个stackoverflow帖子

Do some action after takeUntil

但事情并不像我想要的那么清楚。

解决方法

当条件 (takeUntil) 触发时,您可以使用 merge 运算符并重复使用您的 true 条件来创建映射的 endtimer 值:

const { Subject,merge } = rxjs;
const { takeUntil,mapTo } = rxjs.operators;

const source$ = new Subject();
const condition$ = new Subject();

// Use the merge to work around your completed observable
const result$ = merge(
  // When your condition fires map this event to true
  condition$.pipe(mapTo(true)),// Your already existing pipe in wich the takeUntil lives
  source$.pipe(
    takeUntil(condition$)
  )
)

result$.subscribe(console.log)

source$.next(1);
source$.next(2);
condition$.next();
source$.next(3);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

仅供参考:我不确定此解决方案是否适用于您的应用程序上下文,因为您没有显示声明和设置 timeout 的位置。

,

takeUntil 完成可观察对象,因此要在完成后执行操作,您可以在以下几个地方执行此操作:

  1. completion 处理程序中:
this.start.pipe(
    switchMap(() => timer(0,5000).pipe(
        tap(() => console.log('Polling every 5s')),mergeMap(() => this.poll))
    ),takeUntil(endtimer)
)
.subscribe(
    next  => console.log('handling next value'),error => console.log('handling error'),()    => this.timeout = true    // <--- this fires after observable completes
);
  1. 使用 finalize 运算符:
this.start.pipe(
    switchMap(() => timer(0,takeUntil(endtimer),finalize(() => this.timeout = true)
)
.subscribe();

注意:这些解决方案不是正是您所要求的。当 takeUntil 触发时,它们确实会触发,但它们也会因流完成的任何其他原因而触发。我不认为区别对您的情况很重要,但想在整个问题的上下文中提及。

正如 Sang Dang 在评论中提到的,你也可以从“计时器关闭时(而不是我目前提到的,“当observable 完成"),您只需向计时器添加 tap 即可完成。

const endtimer = timer(60000).pipe(
    tap(() => this.timeout = true)
);