问题描述
是否可以从webflux流量中读取大块数据? (除了使用delayElements之外)
例如我写完之后
Flux.range(1,10).doOnNext(System.out::println).take(5).subscribe();
有什么方法可以继续读取接下来的5个整数?
如果没有,那么消费者是否有其他选择来决定何时请求下一个数据?
编辑:
为澄清起见,我想先读取5个值,然后暂停,然后再读取接下来的5个值,而无需重新创建发射极通量。
解决方法
那么您需要一个成熟的异步订阅者对象,而不仅仅是一系列方法。
// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest {
@Test
public void test10() {
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1,10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
}
static class FluxSubscriber extends Actor {
InpFlow<Integer> inp = new InpFlow<>(this,5);
int count = 0;
@Override
protected void runAction() throws Throwable {
if (inp.isCompleted()) {
System.out.println("input stream completed");
complete();
return;
}
Integer value = inp.remove();
System.out.println("value="+value);
if (++count==5) {
count = 0;
System.out.println("pause:");
delay(1000);
}
}
}
}
实际上,它先读取5个项目,然后在每次调用inp.remove()
之后一个接一个地读取。如果这不是您想要的,则可以扩展类InpFlow
来在调用Subscription.request()
时修改策略。
源代码可在https://github.com/akaigoro/df4j获得(是的,我是作者)。