问题描述
我是RXJS库的新手,并试图弄清楚如何正确使用Observable和Subjects。 我正在尝试与模式设计观察者进行比较。 在某个时候,我有一个疑问,来自库RXJS的Observable实例是否是Observer模式设计的特例?
解决方法
根据定义,Observable
是一个实体,它会随着时间的推移而发出数据。这听起来有些含糊,但同时也很有趣。
我认为,所有RxJS的魔力都是通过链接列表实现的。
每当使用Observable
创建new Observable(subscriber => {})
时,都在定义源或链表的 HEAD 节点。另外,您是否想知道为什么将参数称为subscriber
或observer
?我也会尝试分享我的看法。
主链表是在Observable.pipe()
的帮助下创建的:
pipe(...operations: OperatorFunction<any,any>[]): Observable<any> {
return operations.length ? pipeFromArray(operations)(this) : this;
}
export function pipeFromArray<T,R>(fns: Array<UnaryFunction<T,R>>): UnaryFunction<T,R> {
if (fns.length === 0) {
return identity as UnaryFunction<any,any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any,fn: UnaryFunction<T,R>) => fn(prev),input as any);
};
}
protected lift<R>(operator?: Operator<T,R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
您知道,在RxJS中有许多运算符。 operator
是一个函数,它返回另一个函数,其参数是Observable
(类型为T
),返回值也是Observable
(类型为R
)。
例如map()
:
export function map<T,R>(project: (value: T,index: number) => R,thisArg?: any): OperatorFunction<T,R> {
return function mapOperation(source: Observable<T>): Observable<R> {
if (typeof project !== 'function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
}
return lift(source,new MapOperator(project,thisArg));
};
}
所以,当你拥有
const src$ = new Observable(s => /* ... */)
.pipe(
map(/* ... */)
)
一些事情会发生:
- 首先,它将创建
Observable
实例;提供的回调(在这种情况下为s => ...
)将存储在_subscribe
属性中 -
pipe()
被调用;它将返回fns[0]
,在这种情况下将返回mapOperation
函数 -
mapOperation
将以Observable
实例作为其参数来调用(来自pipeFromArray(operations)(this)
);调用时,它将调用source.lift(new MapOperator(project,thisArg));
;Observable.lift()
是将节点添加到此链接列表的方法;如您所见,一个节点(除HEAD
之外)保留着source
和代表它的operator
当您订阅src$
时,将基于此列表创建另一个。在这个例子中,每个节点将是一个Subscriber
。此列表的创建基于以下事实:每个operator
must have a call
method
export interface Operator<T,R> {
call(subscriber: Subscriber<R>,source: any): TeardownLogic;
}
export class MapOperator<T,R> implements Operator<T,R> {
constructor(private project: (value: T,private thisArg: any) {
}
call(subscriber: Subscriber<R>,source: any): any {
return source.subscribe(new MapSubscriber(subscriber,this.project,this.thisArg));
}
}
Subscriber
节点之间的关系在Observable.subscribe()
在这种情况下,s
(上面的示例)中的new Observable(s => ...)
参数将是MapSubscriber
。
似乎我偏离了这个问题,但是通过上面的解释,我想证明这里Observer
模式中没有多少。
可以通过extends Observable
的Subject
实现此模式:
export class Subject<T> extends Observable<T> implements SubscriptionLike { }
,这意味着您可以使用Subject.pipe(...)
和Subject.subscribe(subscriber)
。 Subject
为了实现此模式所做的就是使用 custom _subscribe
方法:
_subscribe(subscriber: Subscriber<T>): Subscription {
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.isStopped) {
subscriber.complete();
return Subscription.EMPTY;
} else {
// !!!
this.observers.push(subscriber);
return new SubjectSubscription(this,subscriber);
}
}
如您所见,Subject
类会跟踪其观察者(订阅者),因此,当它发出值时,Subject.next()
的所有观察者都会收到它:
next(value: T) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value!);
}
}
}
作为副节点,Subject
也可以充当Subscriber
,因此您不必一直手动调用Subject.{next,error,complete}()
。您可以使用类似的方法实现这一目标
src$.pipe(subjectInstance);