问题描述
-
在Flux上使用Parallel时,我正在使用线程睡眠来停止线程一段时间,但是问题是,磁通量不等到线程睡眠时间后才在订阅的onComplete上执行。
List str =新的ArrayList (); str.add(“ spring”); str.add(“ webflux”); str.add(“ example”);
AtomicInteger num = new AtomicInteger(); ParallelFlux<Object> names = Flux.fromIterable(str) .log() .parallel(2) .runOn(Schedulers.boundedElastic()) .map( s-> { if(s.equalsIgnoreCase("webflux")) { try { System.out.println("waiting..."); Thread.sleep(1000); System.out.println("done..."); } catch (InterruptedException e) { // Todo Auto-generated catch block e.printstacktrace(); } } return s+" "+num.incrementAndGet(); }); names.subscribe(s -> { System.out.println("value "+s+" thread : "+Thread.currentThread().getName()); });
输出:
19:35:24.870 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
19:35:24.896 [main] INFO reactor.Flux.Iterable.1 - | request(256)
19:35:24.897 [main] INFO reactor.Flux.Iterable.1 - | onNext(spring)
19:35:24.898 [main] INFO reactor.Flux.Iterable.1 - | onNext(webflux)
19:35:24.898 [main] INFO reactor.Flux.Iterable.1 - | onNext(example)
waiting...
value spring 1 thread : boundedElastic-1
value example 2 thread : boundedElastic-1
19:35:24.899 [main] INFO reactor.Flux.Iterable.1 - | onComplete()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)