Apache Camel 3.7.0 断路器和重新交付策略

问题描述

我们正在尝试将断路器 (resilience4j) 与重新交付政策结合起来(请参阅下面的路线)

<?xml version="1.0" encoding="UTF-8"?>
<route xmlns="http://camel.apache.org/schema/spring" id="asyncOneConsumer.out" streamCache="true">
   <from uri="kafka:asyncOneConsumer?brokers=localhost:9092&amp;headerDeserializer=#stringKafkaHeaderDeserializer" />
   <onException uSEOriginalMessage="true">
      <exception>com.myCompany.exception.RetryException</exception>
      <redeliveryPolicy delayPattern="0:100;1:200;2:300;4:400;5:600" logExhausted="false" logExhaustedMessageHistory="false" logStackTrace="false" maximumRedeliveries="5" retryAttemptedLogLevel="DEBUG" />
      <to uri="direct:asyncOneConsumer.dlq.error.out" />
   </onException>
   <onException>
      <exception>java.lang.Exception</exception>
      <redeliveryPolicy logExhausted="false" logExhaustedMessageHistory="false" logStackTrace="false" />
      <bean />
   </onException>
   <log loggingLevel="DEBUG" message="Consumed: ${body} for route asyncOneConsumer.out" />
   <process />
   <setHeader name="LOGGING_TYPE">
      <constant>OUTBOUND_REQUEST</constant>
   </setHeader>
   <process />
   <process />
   <removeHeaders pattern="CamelHttP*" />
   <removeHeader headerName="CamelServletcontextpath" />
   <setHeader name="CamelHttpMethod">
      <expressionDeFinition>org.apache.camel.model.ProcessorDeFinition$2@148b2e07</expressionDeFinition>
   </setHeader>
   <setHeader name="Content-Type">
      <expressionDeFinition>org.apache.camel.model.ProcessorDeFinition$2@75125d76</expressionDeFinition>
   </setHeader>
   <process />
   <process />
   <circuitBreaker inheritErrorHandler="true">
      <resilience4jConfiguration failureRateThreshold="50.0" minimumNumberOfCalls="3" permittednumberOfCallsInHalfOpenState="10" slidingWindowSize="10" slowCallDurationThreshold="60" slowCallRateThreshold="100.0" waitDurationInopenState="10">
         <timeoutEnabled>true</timeoutEnabled>
         <timeoutDuration>30000</timeoutDuration>
      </resilience4jConfiguration>
      <process />
      <setHeader name="LOGGING_TYPE">
         <constant>OUTBOUND_RESPONSE</constant>
      </setHeader>
      <process />
   </circuitBreaker>
</route>

我们注意到,如果交换在 CB 打开时处于重试过程中,则继续调用 onRedelivery 处理器(请参阅日志)。然而,实际执行出站调用的处理器并未被调用

[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #1 of 5 due to cause:  Error is retryable. HTTP_STATUS_CODE = null. ] 
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #2 of 5 due to cause:  Error is retryable. HTTP_STATUS_CODE = null. ] 
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #3 of 5 due to cause:  Error is retryable. HTTP_STATUS_CODE = null. ] 
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #4 of 5 due to cause:  CircuitBreaker 'circuitBreaker1' is OPEN and does not permit further calls]
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #5 of 5 due to cause:  CircuitBreaker 'circuitBreaker1' is OPEN and does not permit further calls] 

这确实令人困惑,尤其是当我们增加 kafka 消费者时。这是工作的方式吗?可以用其他方式处理吗?我们尝试了“retryUntil”,但这仅在合法重试期间调用(具有 RetryException 的那些 - 而不是具有 CB 异常的那些)。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...