如何使用Camel和IBM MQ

问题描述

我试图使用连接到IBM MQ的Apache Camel在Java Spring Boot中完成事务性JMS客户端。此外,当消息处理失败时,客户端需要应用指数退避重新传送行为。原因:来自MQ的消息需要进行处理并将其转发到可能因维护而停机的多个小时的外部系统。对我来说,使用交易来至少保证一次处理担保似乎是合适的解决方案。

我已经研究了这个主题很多小时,却找不到解决方案。我将从现在拥有的东西开始:

  @Bean
  UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter ()
      throws IOException {
    MQConnectionFactory factory = new MQConnectionFactory();
    factory.setCCDTURL(tabFilePath);

    UserCredentialsConnectionFactoryAdapter adapter =
        new UserCredentialsConnectionFactoryAdapter();
    adapter.setTargetConnectionFactory(factory);
    adapter.setUsername(userName);
    bentechConnectionFactoryAdapter.setPassword(password);

    return adapter;
  }

  @Bean
  PlatformTransactionManager jmsTransactionManager(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter) {
    JmsTransactionManager txMgr = new JmsTransactionManager(uccConnectionFactoryAdapter);
    return txMgr;
  }

  @Bean()
  CamelContextConfiguration contextConfiguration(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter,@Qualifier("jmsTransactionManager") @Autowired PlatformTransactionManager txMgr) {
    return new CamelContextConfiguration() {
      @Override
      public void beforeApplicationStart(CamelContext context) {
        JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(uccConnectionFactoryAdapter,txMgr);
        // required for consumer-level redelivery after rollback
        jmsComponent.setCacheLevelName("CACHE_CONSUMER");
        jmsComponent.setTransacted(true);
        jmsComponent.getConfiguration().setConcurrentConsumers(1);

        context.addComponent("jms",jmsComponent);
      }

      @Override
      public void afterapplicationStart(CamelContext camelContext) {
        // Do nothing
      }
    };
  }

// in a route builder
...
from("jms:topic:INPUT_TOPIC?clientId=" + CLIENT_ID + "&subscriptionDurable=true&durableSubscriptionName="+ SUBSCRIPTION_NAME)
    .transacted()
    .("direct:processMessage");
...

我能够通过集成测试来验证交易行为。如果在消息处理期间发生未处理的异常,则事务将回滚并重试。问题是,每秒每秒几次立即重试,可能会给IBM MQ管理器和外部系统带来很大的负担。

对于ActiveMQ,重新交付策略很容易实现,网上有很多示例。 ActiveMQConnectionFactory一个setRedeliveryPolicy方法,也就是说,ActiveMQ客户端库内置了重新交付逻辑。根据我所知,这与Camel的Transactional Client EIP文档一致,其中指出:

以事务方式进行的重新交付不是由Camel处理,而是由后备系统(事务管理器)处理。在这种情况下,您应诉诸支持系统来配置重新交付。

我绝对不能弄清楚的是如何为IBM MQ实现相同的目标。 IBM的MQConnectionFactory不支持重新交付策略。实际上,在MQ知识中心中搜索redeliverypolicy会完全显示... drumroll ... 0次命中。我什至仔细研究了MQConnectionFactory的实现,也没有发现任何东西。

我研究的另一个支持系统是JmsTransactionManager搜索“ jmstransactionmanager重新交付策略”或“ jmstransactionmanager指数补偿”也没有发现任何有用的信息。有人谈论TransactionTemplateAbstractMessageListenerContainer,但1)我看不到与重新交付政策有任何联系,2)我不知道它们如何与Camel和JMS交互。

那么,有人知道如何使用Apache Camel和IBM MQ实施指数退避重新交付策略吗?

结束语:骆驼在errorHandleronException支持重新交付策略,与交易/连接支持系统中的重新交付策略 不同。这些处理程序使用故障状态的“ Exchange”对象在故障点重试,而无需回滚并从路由开始处重新处理消息。交易在整个租用期间都保持活动状态,并且仅在errorHandleronException放弃时才发生回滚。这不是我想要的可能会持续多个小时的重试。

解决方法

看起来像@JoshMc指向我正确的方向。我设法实现了一个RoutePolicy,该延迟随着交付的增加而延迟了重新交付。我已经运行了几个小时的测试会话,并重新发送了同一条消息,以查看是否存在诸如内存泄漏,MQ连接耗尽之类的问题。我没有发现任何问题。与MQ管理器有两个稳定的TCP连接,并且Java进程的内存使用在近距离范围内移动。

import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Session;
import lombok.extern.log4j.Log4j2;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Route;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.support.RoutePolicySupport;

@Log4j2
public class ExponentialBackoffPolicy extends RoutePolicySupport implements CamelContextAware {
  final static String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
  private CamelContext camelContext;

  @Override
  public void setCamelContext(CamelContext camelContext) {
    this.camelContext = camelContext;
  }

  @Override
  public CamelContext getCamelContext() {
    return this.camelContext;
  }

  @Override
  public void onExchangeDone(Route route,Exchange exchange) {
    try {
      // ideally we would check if the exchange is transacted but onExchangeDone is called after the
      // transaction is already rolled back,and the transaction context has already been removed.
      if (exchange.getException() == null)
      {
        log.debug("No exception occurred,skipping route suspension.");
        return;
      }

      int deliveryCount = getRetryCount(exchange);
      int redeliveryDelay = getRedeliveryDelay(deliveryCount);
      log.info("Suspending route {} for {}ms after exception. Current delivery count {}.",route.getId(),redeliveryDelay,deliveryCount);

      super.suspendRoute(route);
      scheduleWakeup(route,redeliveryDelay);
    } catch (Exception ex) {
      // only log exception and let Camel continue as of this policy didn't exist.
      log.error("Exception while suspending route",ex);
    }
  }

  void scheduleWakeup(Route route,int redeliveryDelay) {
    Timer timer = new Timer();
    timer.schedule(
        new TimerTask() {
          @Override
          public void run() {
            log.info("Resuming route {} after redelivery delay of {}ms.",redeliveryDelay);
            try {
              resumeRoute(route);
            } catch (Exception ex) {
              // only log exception and let Camel continue as of this policy didn't exist.
              log.error("Exception while resuming route",ex);
            }
            timer.cancel();
          }
        },redeliveryDelay);
  }

  int getRetryCount(Exchange exchange) {
    Message msg = exchange.getIn();
    return (int) msg.getHeader(JMSX_DELIVERY_COUNT,1);
  }

  int getRedeliveryDelay(int deliveryCount) {
    // very crude backoff strategy for now,will need to refine later
    if (deliveryCount < 10) return 1000;
    if (deliveryCount < 20) return 5000;
    if (deliveryCount < 30) return 20000;
    return 60000;
  }
}

要注意的一件事是JMSXDeliveryCount头来自MQ管理器,并且据此计算重新交付延迟。当您在消息永久失败的情况下使用ExponentialBackoff策略重新启动应用程序时,在重新启动消息后,它将立即尝试重新处理该消息,但是如果再次失败,则将延迟时间设置为与重新交付的总数相对应,而不是重新开始最初的延迟很短。