使用 Java Reactor 并行运行 cassandra 查询

问题描述

我在尝试并行运行多个 cassandra 查询时遇到问题(Spring 5、Spring-Data、Cassandra 4、datastax 驱动程序 4.9.0)。

这个示例代码没问题

Mono<String> mono1 = Mono.just("1").log().subscribeOn(Schedulers.boundedElastic());
Mono<String> mono2 = Mono.just("2").log().subscribeOn(Schedulers.boundedElastic());
Mono.zip(Arrays.asList(mono2,mono1),result -> result).block();

日志显示每个单声道都在其线程中执行(boundedElastic-X)

18:46:12.866 [boundedElastic-1] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.Scalarsubscription)
18:46:12.866 [boundedElastic-2] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.Scalarsubscription)
18:46:12.869 [boundedElastic-1] INFO reactor.Mono.Just.2 - | request(unbounded)
18:46:12.869 [boundedElastic-2] INFO reactor.Mono.Just.1 - | request(unbounded)
18:46:12.869 [boundedElastic-1] INFO reactor.Mono.Just.2 - | onNext(2)
18:46:12.869 [boundedElastic-2] INFO reactor.Mono.Just.1 - | onNext(1)
18:46:12.869 [boundedElastic-1] INFO reactor.Mono.Just.2 - | onComplete()
18:46:12.869 [boundedElastic-2] INFO reactor.Mono.Just.1 - | onComplete()

但是当我用 cassandra 查询替换我的示例 Mono 时,该查询使用 ReactiveCassandraRepository for ex 插入记录

Mono<Content> mono1 = cassandraOperations.insert(entity1).log().subscribeOn(Schedulers.boundedElastic());
Mono<Content> mono2 = cassandraOperations.insert(entity2).log().subscribeOn(Schedulers.boundedElastic());
Mono.zip(Arrays.asList(mono1,mono2),result -> result).block();

我得到这些日志:

2021-03-21 18:49:14.769  INFO 5664 --- [oundedElastic-1] reactor.Mono.MapFuseable.1 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2021-03-21 18:49:14.769  INFO 5664 --- [oundedElastic-2] reactor.Mono.MapFuseable.2 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2021-03-21 18:49:14.771  INFO 5664 --- [oundedElastic-2] reactor.Mono.MapFuseable.2 : | request(unbounded)
2021-03-21 18:49:14.771  INFO 5664 --- [oundedElastic-1] reactor.Mono.MapFuseable.1 : | request(unbounded)
2021-03-21 18:49:14.781  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.2 : | onNext(com.acme.content.entity.Content@36cdb2c5)
2021-03-21 18:49:14.781  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.2 : | onComplete()
2021-03-21 18:49:14.784  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.1 : | onNext(com.acme.content.entity.Content@50d5f4a4)
2021-03-21 18:49:14.784  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.1 : | onComplete()

OnSubscribe() 和 request() 方法按预期在专用线程 (oundedElastic-X) 内完成,但我很惊讶地看到 OnNext() 和 onComplete() 操作是在同一个线程内完成的(所以-io- 3).

我一定漏掉了一些东西,有人能解释一下两个样本中发生的差异吗?

谢谢!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)