问题描述
在RxJava中,运算符groupBy
并window
返回只能被订阅一次的observable,如果被订阅,则将其累积的内容重播给唯一的订阅者并切换到“热”模式。
这是在返回完全热的可观察值和冒险缺失值之间或在返回无限制的重播可观察值之间进行权衡的方法,该值允许任何订户但无限期地保留累积的内容。
中间地带,即单个订户,冷到热可观察到的行为被认为是最不令人惊讶的行为,它使开发人员可以选择应用更多的运算符并在两个极端之间选择一个要点:
source.window(1, TimeUnit.SECONDS)
.map(w -> w.publish())
.doOnNext(w -> w.connect())
.subscribe(...)
source.window(1, TimeUnit.SECONDS)
.map(w -> w.cache())
.subscribe(...)
解决方法
我正在使用RxJava计算Android中某些传感器数据的标准化自动相关性。奇怪的是,我的代码引发了一个异常(“
java.lang.IllegalStateException:只允许一个订阅者!”),我不确定该怎么做:我知道GroupedObservables订阅多个订阅者时可能会抛出此异常,但是我不认为我在任何地方都在使用这种东西。
在下面,您找到(最有可能)触发异常的方法:
public Observable<Float> normalizedAutoCorrelation(Observable<Float> observable,final int lag) {
Observable<Float> laggedObservable = observable.skip(lag);
Observable<Float> meanObservable = mean(observable,lag);
Observable<Float> laggedMeanObservable = mean(laggedObservable,lag);
Observable<Float> standardDeviationObservable = standardDeviation(observable,meanObservable,lag);
Observable<Float> laggedStandardDeviationObservable = standardDeviation(laggedObservable,laggedMeanObservable,lag);
Observable<Float> deviation = observable.zipWith(meanObservable,new Func2<Float,Float,Float>() {
@Override
public Float call(Float value,Float mean) {
return value - mean;
}
});
Observable<Float> laggedDeviation = observable.zipWith(laggedMeanObservable,Float mean) {
return value - mean;
}
});
Observable<Float> autoCorrelationPartObservable = deviation.zipWith(laggedDeviation,Float laggedValue) {
return value * laggedValue;
}
});
Observable<Float> autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag,1).scan(new Func2<Observable<Float>,Observable<Float>,Observable<Float>>() {
@Override
public Observable<Float> call(Observable<Float> memoObservable,Observable<Float> observable) {
if(memoObservable == null) return observable;
return memoObservable.zipWith(observable,Float>() {
@Override
public Float call(Float memo,Float value) {
return memo + value;
}
});
}
}));
Observable<Float> normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable,Float>() {
@Override
public Float call(Float standardDeviation,Float laggedStandardDeviation) {
return lag * standardDeviation * laggedStandardDeviation;
}
});
return autoCorrelationObservable.zipWith(normalizationObservable,Float>() {
@Override
public Float call(Float autoCorrelation,Float normalization) {
return autoCorrelation / normalization;
}
});
}
这是我得到的stacktrace:
java.lang.IllegalStateException: Only one subscriber allowed!
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124)
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110)
at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:173)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:98)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:161)
at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:183)
at rx.internal.operators.OperatorSkip$1.onNext(OperatorSkip.java:58)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
at com.github.joopaue.smartphonesensing.SensorService$3.onSensorChanged(SensorService.java:102)
at android.hardware.SystemSensorManager$SensorEventQueue.dispatchSensorEvent(SystemSensorManager.java:418)
at android.os.MessageQueue.nativePollOnce(Native Method)
at android.os.MessageQueue.next(MessageQueue.java:138)
at android.os.Looper.loop(Looper.java:123)
at android.app.ActivityThread.main(ActivityThread.java:5146)
at java.lang.reflect.Method.invokeNative(Native Method)
at java.lang.reflect.Method.invoke(Method.java:515)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:732)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:566)
at dalvik.system.NativeStart.main(Native Method)
我认为我在这里没有做任何奇怪的事情:一些压缩,缩小,扫描和平面图。
我是否遗漏了一些完全显而易见的东西,是否有一些隐藏的规则要打破,还是RxJava中的错误?谢谢!
PS。如果缺少一些代码来得出结论,请问一下,我会发布!