分块地从Flux <Integer>读取

问题描述

是否可以从webflux流量中读取大块数据? (除了使用delayElements之外)

例如我写完之后

Flux.range(1,10).doOnNext(System.out::println).take(5).subscribe();

有什么方法可以继续读取接下来的5个整数?

如果没有,那么消费者是否有其他选择来决定何时请求下一个数据?

编辑:

为澄清起见,我想先读取5个值,然后暂停,然后再读取接下来的5个值,而无需重新创建发射极通量。

解决方法

那么您需要一个成熟的异步订阅者对象,而不仅仅是一系列方法。

// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxSubscriberTest {

    @Test
    public  void test10() {
        FluxSubscriber subscriber = new FluxSubscriber();
        Flux.range(1,10).subscribe(subscriber.inp);
        subscriber.start();
        boolean ok = subscriber.blockingAwait(5000);
        Assert.assertTrue(ok);
    }

    static class FluxSubscriber extends Actor {
        InpFlow<Integer> inp = new InpFlow<>(this,5);
        int count = 0;

        @Override
        protected void runAction() throws Throwable {
            if (inp.isCompleted()) {
                System.out.println("input stream completed");
                complete();
                return;
            }
            Integer value = inp.remove();
            System.out.println("value="+value);
            if (++count==5) {
                count = 0;
                System.out.println("pause:");
                delay(1000);
            }
        }
    }
}

实际上,它先读取5个项目,然后在每次调用inp.remove()之后一个接一个地读取。如果这不是您想要的,则可以扩展类InpFlow来在调用Subscription.request()时修改策略。

源代码可在https://github.com/akaigoro/df4j获得(是的,我是作者)。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...