Quarkus BackPression管理

问题描述

我使用quarkus反应性消息和kafka获得了以下堆栈跟踪:

        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossthread.run(JBossthread.java:479)


//very very long sql function call

2020-08-04 12:05:35,739 ERROR [io.sma.rea.mes.kafka] (executor-thread-78) SRMSG18207: Unable to dispatch message to Kafka: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer is full due to lack of downstream consumption
        at io.smallrye.mutiny.operators.multi.overflow.MultiOnOverflowBufferOp$OnOverflowBufferProcessor.onItem(MultiOnOverflowBufferOp.java:80)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.context.SmallRyethreadcontext.lambda$withContext$0(SmallRyethreadcontext.java:217)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.context.SmallRyethreadcontext.lambda$withContext$0(SmallRyethreadcontext.java:217)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
        at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
        at io.smallrye.mutiny.operators.multi.builders.IntervalMulti$IntervalRunnable.run(IntervalMulti.java:77)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossthread.run(JBossthread.java:479)

这是我的代码

@Outgoing( "eqs-crossing-mid" )
    public Multi< EQSAlert > eqsCrossingTGV_MID(){

        final String series = CrossingEnum.TGV_MID.getSeries();
        final String equipment = CrossingEnum.TGV_MID.getEquipment();

        final String vehicleRegex = vehicleRegexService.getRegexBySeries( series );

        log.info( "Incoming request for {} - {}",series,equipment);
        log.info( "Vehicle regex : {}",vehicleRegex );

        return Multi
                .createFrom()
                .ticks()
                .every(
                        Duration.ofSeconds( poolingInterval )
                )
                .onOverflow()
                .buffer(10)
                .concatMap(i -> {
                    final Multi<CrossingState> crossingStateBySeriesAndEquipment = CrossingState.getCrossingStateBySeriesAndEquipment(client,equipment);

                    return crossingStateBySeriesAndEquipment.flatMap(crossingState ->
                            crossingState.isActive() ?
                                    EQSAlert.getEQSAlertBySeriesAndEquipment(
                                            client,vehicleRegex,equipment
                                    )
                                    :
                                    Multi.createFrom().empty()
                    );
                });
    }

如您所见,我每5秒钟执行一次sql函数调用(缓冲间隔) 我已经注意到,在提交错误版本的sql函数后,我收到了此错误。 通常情况下,调用的时间少于1秒,而函数最多需要2分钟才能发送响应。

所以我对sql函数进行了更正,现在一切正常。

我的帖子的目的是了解应用程序发生了什么? 是因为我的sql调用堆栈太大吗?

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)