问题描述
图书馆:
- r2dbc-postgresql-0.8.6.RELEASE
- r2dbc-pool-0.8.5.RELEASE
- r2dbc-spi-0.8.3.RELEASE
- postgresql-42.2.18
- 列表项
问题: 我尝试使用 R2DBC (Postgresql) 进行批量插入,代码如下:
@Override
public Flux<Long> test(List<User> users) {
return Mono.from(connectionFactory.create())
.flatMapMany(c -> Mono.from(c.beginTransaction())
.thenMany(Flux.fromIterable(users)
.map(u -> {
return Flux.from(c.createStatement("INSERT INTO public.users(name,age,salary) VALUES ($1,$2,$3)").returnGeneratedValues("id")
.bind(0,u.getName())
.bind(1,u.getAge())
.bind(2,u.getSalary()).execute());
})
.flatMap(result -> result)
.map(result -> result.map((row,Meta) -> {
return row.get("id",Long.class);
}))
.flatMap(Flux::from)
.delayUntil(r -> c.commitTransaction())
.doFinally((st) -> c.close())));
}
代码将执行语句将用户插入数据库,然后获取生成的用户 ID。如果用户列表小于或等于 255,上面的代码按预期工作。当用户列表大于 255(256~)时,出现如下异常:
[5b38a8c6-2] There was an unexpected error (type=Internal Server Error,status=500).
Cannot exchange messages because the request queue limit is exceeded
io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: Cannot exchange messages because the request queue limit is exceeded
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler xwitch.org.helloworld.rest.v2.CRUDController#importUsersBatchByR2DBC() [dispatcherHandler]
|_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/api/v2/users/import-users-batch-by-r2dbc" [ExceptionHandlingWebHandler]
Stack trace:
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)
at io.r2dbc.postgresql.util.FluxdiscardOnCancel.subscribe(FluxdiscardOnCancel.java:49)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:425)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:328)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:345)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:191)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:248)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:324)
at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
当我尝试调查以发现发生了什么时。我看到 ReactorNettyClient.java 抛出了异常。实现是:
public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeuntil,Publisher<FrontendMessage> requests,Consumer<Flux<FrontendMessage>> sender,supplier<Boolean> isConnected) {
return Flux.create(sink -> {
Conversation conversation = new Conversation(takeuntil,sink);
// ensure ordering in which conversations are added to both queues.
synchronized (this.conversations) {
if (this.conversations.offer(conversation)) {
sink.onRequest(value -> onRequest(conversation,value));
if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
return;
}
Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
if (!isConnected.get()) {
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
}
});
sender.accept(requestMessages);
} else {
sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
}
}
});
}
当队列超过 255 并且 Queue.offer 方法返回 false 时出错。导致异常被抛出。
对不起,我不熟悉英语。请帮助我弄清楚发生了什么以及解决它的解决方案。 我想批量插入每个请求的记录数 >100000。
谢谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)