如何在不中断可观察流订阅的情况下向主题发送不同的可观察结果?

问题描述

使用简洁的示例编辑了问题。我认为这将使人们更容易理解。请注意,此示例是超级简化的,通常我在不同组件之间分层放置,但是对于这个问题就足够了。

使用此组件。它使用获取的对象的名称,以及获取一个对象的按钮。为了获得下一个REST请求的价值,我只知道订阅答案,别无其他方法,我想要的是拥有“ combineLatest”之类的东西,但为了“未来”,所以我可以合并最新的流

import { Component,VERSION,OnInit } from '@angular/core';
import { BehaviorSubject,Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'my-app',templateUrl: './app.component.html',styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  
    private readonly PEOPLE_API_ENDPOINT = `https://swapi.dev/api/people/`;

   private characterSubject : BehaviorSubject<any> = new BehaviorSubject<any>({name: 'loading'});
  private currentCharacter: number = 1;
  
  character$ : Observable<any> = this.characterSubject.asObservable();
  
  constructor(
    private http: HttpClient
  ) {}
    
  ngOnInit() {
    this.updateCurrentCharacter();
  }

  nextCharacter() : void {
    this.currentCharacter ++;
    this.updateCurrentCharacter();
  }

  //I would want to avoid subscribing,instead
  //I would like some sort of operation to send the stream
  //emissions to the subject. As to not break the observable
  //chain up until presentation,like the best practices say.
  private updateCurrentCharacter() : void {
    this.fetchCharacter(this.currentCharacter)
      .subscribe( 
        character => this.characterSubject.next(character)
      );
  }

  private fetchCharacter (id: number) : Observable<any> {
        return this.http.get(this.PEOPLE_API_ENDPOINT + `${id}/`);
  }
}
<span>{{ character$ | async }} </span>

<button (click)="nextCharacter()">Next character</button>

Online demo

有没有办法做到这一点?做类似“ emitIn(characterSubject)”的事情。我认为没有什么比这更像动态地向源中添加源排放了。

解决方法

如果我理解正确,那么您有一项服务,该服务的某种方法会触发http调用,例如dataRepository.fetch(id),并且您有不同的组件,需要在调用响应到达时做出反应。

在这种情况下,有很多方法可以处理此类要求,一种方法是使用公开为服务公共属性的主题,这是我想知道的。

要实现此行为,您所需要的代码就是您所需要的,换言之,就可以了

dataRepository.fetch(id)
  .subscribe(
    newData => currentData.next(newData)
  )

如果您想使其更加完整并同时管理errorcomplete情况,则可以这样编写

dataRepository.fetch(id)
  .subscribe(currentData)

在最后一种形式中,您要传递currentData作为subscribe的Observer参数。 currentData虽然也是可观察的,因此可以nexterrorcomplete来使用。

如果使用ReplaySubject,则可以添加存储最后结果的可能性,并将其显示在通知结果后创建的组件中。

,

可观察到的原始组件是动态更改的,但这确实很危险(任何管道都可能破坏任何可观察的对象)。我需要另一种不叫管道的东西。

我不同意。您可以使用根据输入数据返回可观察物的函数轻松创建动态可观察物,并且不会中断。

function getUserFriendsById(userId: string): Observable<User[]>{
return service.getProfileById(userId).pipe(
  mergeMap(user=>{
    return getUsersByArrayOfIds(user.friends);
  })
 )
}

getUserFriendsById('exampleUserId').subscribe(friends=>{...});

但是,如果您认为如果内部发生故障,则只需使用CatchError。

function getUserFriendsById(userId: string): Observable<User[]>{
return service.getProfileById(userId).pipe(
  mergeMap(user=>{
    return getUsersByArrayOfIds(user.friends).pipe(
      catchError(err => {
      return of([]) // will return empty friend list if this APi call fails.
    }))
  }),catchError(err=>{
   return of([]); // will return empty friend list if this APi call fails.
  })
 )
}

getUserFriendsById('exampleUserId').subscribe(friends=>{...});

我们之所以使用mergeMap仅仅是因为我们需要第一个Observable的数据来发送第二个请求。

如果您正在寻找的是将这些可观察值从基数中混合出来的方法,那么这将使所有rxjs运算符无用,因为这就是它们的作用。

但是,如果您不想将数据发送到currentData Observable ,并且您知道数据源,那么您甚至不需要为其创建单独的Observable 。仅使用管道即可获取数据。

currentData = dataRepository.fetch(id);
// at some point when you need currentData data you will subscribe to it.

// is same as 

dataRepository.fetch(id)
  .subscribe(
    newData => currentData.next(newData)
  )

或者甚至在您需要通过dataRepository.fetch(id)响应调用另一个Observable时

dataRepository.fetch(id).pipe(
mergeMap(fetchedId=>{
    return getUserById(fetchedId);
  })
)

但是,如果您想更深入地创建流而不传递数据,我想您会感到失望的,因为这是一般工作中的可观察性。而且我猜想这种方法非常有限且难以维护,因为最终您需要将不同的请求发送到多个API并收集数据,从而导致流程中断。 https://www.learnrxjs.io/learn-rxjs/operators/creation/create

// RxJS v6+
import { Observable } from 'rxjs';
/*
  Create an observable that emits 'Hello' and 'World' on  
  subscription.
*/
const hello = Observable.create(function(observer) {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

希望对您有所帮助,但如果您不是您要找的东西,请告诉我。