R2DBC Statement.fetchsize 精确度的工作原理

问题描述

我正在使用 r2dbc-postgresql 驱动程序。假设我们有一个包含 1000 条记录的表,并且 fetchSize 为 100:

connectionMono.flatMapMany(
                connection -> connection
                        .createStatement("select age from users")
                        .fetchSize(100)
                        .execute())

会执行多少网络调用?我知道使用 JDBC Statement.SetFetchsize 时,驱动程序将分 10 个批次获取所有行,每批次 100 行。

解决方法

查看 r2dbc 驱动程序中的代码,行为是相同的:它按块获取指定大小的行,因此在您的情况下为 100。

这是处理ExtendedQueryMessageFlow中的方法的代码:

    /**
     * Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
     *
     * @param bindFlow  the initial bind flow
     * @param client    client to use
     * @param portal    the portal
     * @param fetchSize fetch size per roundtrip
     * @return the resulting message stream
     */
    private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow,Client client,String portal,int fetchSize) {

        DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
        FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
        AtomicBoolean isCanceled = new AtomicBoolean(false);

        return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal,fetchSize),Flush.INSTANCE)).concatWith(requestsProcessor))
            .handle((BackendMessage message,SynchronousSink<BackendMessage> sink) -> {
                if (message instanceof CommandComplete) {
                    requestsSink.next(new Close(portal,PORTAL));
                    requestsSink.next(Sync.INSTANCE);
                    requestsSink.complete();
                    sink.next(message);
                } else if (message instanceof ErrorResponse) {
                    requestsSink.next(Sync.INSTANCE);
                    requestsSink.complete();
                    sink.next(message);
                } else if (message instanceof PortalSuspended) {
                    if (isCanceled.get()) {
                        requestsSink.next(new Close(portal,PORTAL));
                        requestsSink.next(Sync.INSTANCE);
                        requestsSink.complete();
                    } else {
                        requestsSink.next(new Execute(portal,fetchSize));
                        requestsSink.next(Flush.INSTANCE);
                    }
                } else {
                    sink.next(message);
                }
            })
            .as(flux -> Operators.discardOnCancel(flux,() -> isCanceled.set(true)));
    }