为什么RxJava中的onBackpressureDrop会发生这种情况

问题描述

在这里,我有一个可流动的对象,它每毫秒发射一次元素。

   Flowable<Long> source = Flowable.interval(1,TimeUnit.MILLISECONDS).take(14000);
        source.map(e->{
            Log.d("TAGBefore","before " + e);
            return e;
        })
        .onBackpressureDrop()
        .observeOn(Schedulers.computation())
        .subscribe(
                        e-> {
                            Log.d("TAGNext","onNext: " + e);
                            Thread.sleep(100);
                        },e-> Log.d("TAGError","error: " + e),()-> Log.d("TAGComplete","onComplete")
        );

我使用过之前的信息来知道可观察对象发出元素的时刻,我的疑问是,从127(观察者已满)开始,它会达到9688

   TAGNext: onNext: 125
   TAGNext: onNext: 126
   TAGNext: onNext: 127
   TAGNext: onNext: 9668
   TAGNext: onNext: 9669
   TAGNext: onNext: 9670

但是,当我进一步检查控制台(使用其他搜索过滤器)时,我意识到发布127时它已经去了12794,所以不是9688而不应该是12794或接近的数字? ,谢谢。

   TAGBefore: before: 12793
   TAGBefore: before: 12794
   TAGNext:   onNext: 127
   TAGBefore: before: 12795
   TAGBefore: before: 12796
 

但是,当我更多地检查控制台(使用其他搜索过滤器)时,我意识到发出127时它已经达到12794了,因此代替9688而不应该是12794或接近可观察的数字,而不是9794。已经免费了吗?如果出现错误,我要澄清一下,我是RxJava的新手,谢谢。

解决方法

observeOn具有默认的128元素缓冲区,该缓冲区可以很快填充。

每100毫秒将其耗尽一次,直到仅剩32个元素为止,此时它又请求96个元素。因此,要让更多项目通过onBackpressureDrop大约需要9600毫秒,因此您看到TAGNext: onNext: 9668

排空第127个元素时,您需要运行大约12700毫秒,因此从生产者端看到TAGBefore: before: 12795

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...