问题描述
我正在使用 r2dbc-postgresql 驱动程序。假设我们有一个包含 1000 条记录的表,并且 fetchSize
为 100:
connectionMono.flatMapMany(
connection -> connection
.createStatement("select age from users")
.fetchSize(100)
.execute())
会执行多少网络调用?我知道使用 JDBC Statement.SetFetchsize
时,驱动程序将分 10 个批次获取所有行,每批次 100 行。
解决方法
查看 r2dbc 驱动程序中的代码,行为是相同的:它按块获取指定大小的行,因此在您的情况下为 100。
这是处理ExtendedQueryMessageFlow中的方法的代码:
/**
* Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
*
* @param bindFlow the initial bind flow
* @param client client to use
* @param portal the portal
* @param fetchSize fetch size per roundtrip
* @return the resulting message stream
*/
private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow,Client client,String portal,int fetchSize) {
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
AtomicBoolean isCanceled = new AtomicBoolean(false);
return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal,fetchSize),Flush.INSTANCE)).concatWith(requestsProcessor))
.handle((BackendMessage message,SynchronousSink<BackendMessage> sink) -> {
if (message instanceof CommandComplete) {
requestsSink.next(new Close(portal,PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof ErrorResponse) {
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof PortalSuspended) {
if (isCanceled.get()) {
requestsSink.next(new Close(portal,PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
} else {
requestsSink.next(new Execute(portal,fetchSize));
requestsSink.next(Flush.INSTANCE);
}
} else {
sink.next(message);
}
})
.as(flux -> Operators.discardOnCancel(flux,() -> isCanceled.set(true)));
}