春季整合手动接力

问题描述

我想用Spring集成创建一个简单的IntegrationFlow,但遇到了困难。

我想创建一个集成流程,该流程从Rabbit Mq中的队列获取消息并将消息发布到端点Rest。我想根据发布的结果手动ack

集成流程的典型行为如下:

  1. 我在队列中收到一条消息。
  2. Spring会检测到它,获取消息并将其发布到Rest端点。
  3. 终点以200码作为响应。
  4. Spring集成确认消息。

如果端点响应时出现错误代码,我希望能够取消或重试。

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAckNowledgeMode(AckNowledgeMode.MANUAL);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);

/* Get Message from RabbitMQ */
return IntegrationFlows.from(Amqp.inboundAdapter(container)) 
    .handle(msg ->
    {
        String msgString = new String((byte[]) msg.getPayload(),StandardCharsets.UTF_8);
        httpentity<String> requestBody = new httpentity<String>(msgString,headers);
        restTemplate.postForObject(ENDPOINT_LOCAL_URL,requestBody,String.class);
        System.out.println(msgString);
    })
    .get();

解决方法

此用例无需使用手动确认模式;如果他的休息呼叫正常返回,则容器将确认该消息;如果引发异常,则容器将保留该消息并将其重新发送。

如果您使用手动方式,则ChanneldeliveryTag邮件标题中的AmqpHeaders.CHANNELAmqpHeaders.DELIVERY_TAG可用,您可以调用basicAck或{{ 1}}上的通道(您必须向入站适配器添加错误通道以处理错误。