观察者模式设计和RXJS库

问题描述

我是RXJS库的新手,并试图弄清楚如何正确使用Observable和Subjects。 我正在尝试与模式设计观察者进行比较。 在某个时候,我有一个疑问,来自库RXJS的Observable实例是否是Observer模式设计的特例?

解决方法

根据定义,Observable是一个实体,它会随着时间的推移而发出数据。这听起来有些含糊,但同时也很有趣。

我认为,所有RxJS的魔力都是通过链接列表实现的。

每当使用Observable创建new Observable(subscriber => {})时,都在定义源或链表的 HEAD 节点。另外,您是否想知道为什么将参数称为subscriberobserver?我也会尝试分享我的看法。

主链表是在Observable.pipe()的帮助下创建的:

pipe(...operations: OperatorFunction<any,any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

export function pipeFromArray<T,R>(fns: Array<UnaryFunction<T,R>>): UnaryFunction<T,R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any,any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any,fn: UnaryFunction<T,R>) => fn(prev),input as any);
  };
}

Observable.lift()

protected lift<R>(operator?: Operator<T,R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

您知道,在RxJS中有许多运算符。 operator是一个函数,它返回另一个函数,其参数是Observable(类型为T),返回值也是Observable (类型为R)。

例如map()

export function map<T,R>(project: (value: T,index: number) => R,thisArg?: any): OperatorFunction<T,R> {
  return function mapOperation(source: Observable<T>): Observable<R> {
    if (typeof project !== 'function') {
      throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
    }
    return lift(source,new MapOperator(project,thisArg));
  };
}

所以,当你拥有

const src$ = new Observable(s => /* ... */)
  .pipe(
    map(/* ... */)
  )

一些事情会发生:

  • 首先,它将创建Observable实例;提供的回调(在这种情况下为s => ...)将存储在_subscribe属性中
  • pipe()被调用;它将返回fns[0],在这种情况下将返回mapOperation函数
  • mapOperation将以Observable实例作为其参数来调用(来自pipeFromArray(operations)(this));调用时,它将调用source.lift(new MapOperator(project,thisArg));; Observable.lift()是将节点添加到此链接列表的方法;如您所见,一个节点(除HEAD之外)保留着source和代表它的operator

当您订阅src$时,将基于此列表创建另一个。在这个例子中,每个节点将是一个Subscriber。此列表的创建基于以下事实:每个operator must have a call method

export interface Operator<T,R> {
  call(subscriber: Subscriber<R>,source: any): TeardownLogic;
}

MapOperator is no exception

export class MapOperator<T,R> implements Operator<T,R> {
  constructor(private project: (value: T,private thisArg: any) {
  }

  call(subscriber: Subscriber<R>,source: any): any {
    return source.subscribe(new MapSubscriber(subscriber,this.project,this.thisArg));
  }
}

Subscriber节点之间的关系在Observable.subscribe()

中建立

在这种情况下,s(上面的示例)中的new Observable(s => ...)参数将是MapSubscriber

似乎我偏离了这个问题,但是通过上面的解释,我想证明这里Observer模式中没有多少。

可以通过extends ObservableSubject实现此模式:

export class Subject<T> extends Observable<T> implements SubscriptionLike { }

,这意味着您可以使用Subject.pipe(...)Subject.subscribe(subscriber)Subject为了实现此模式所做的就是使用 custom _subscribe方法:

_subscribe(subscriber: Subscriber<T>): Subscription {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  } else if (this.hasError) {
    subscriber.error(this.thrownError);
    return Subscription.EMPTY;
  } else if (this.isStopped) {
    subscriber.complete();
    return Subscription.EMPTY;
  } else {
    
    // !!!
    this.observers.push(subscriber);
    return new SubjectSubscription(this,subscriber);
  }
}

如您所见,Subject类会跟踪其观察者(订阅者),因此,当它发出值时,Subject.next()的所有观察者都会收到它:

next(value: T) {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  }
  if (!this.isStopped) {
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) {
      copy[i].next(value!);
    }
  }
  }

作为副节点,Subject也可以充当Subscriber,因此您不必一直手动调用Subject.{next,error,complete}()。您可以使用类似的方法实现这一目标

src$.pipe(subjectInstance);

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...