问题描述
我想用Spring集成创建一个简单的IntegrationFlow,但遇到了困难。
我想创建一个集成流程,该流程从Rabbit Mq中的队列获取消息并将消息发布到端点Rest。我想根据发布的结果手动ack
。
集成流程的典型行为如下:
- 我在队列中收到一条消息。
- Spring会检测到它,获取消息并将其发布到Rest端点。
- 终点以200码作为响应。
- Spring集成确认消息。
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAckNowledgeMode(AckNowledgeMode.MANUAL);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
/* Get Message from RabbitMQ */
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(),StandardCharsets.UTF_8);
httpentity<String> requestBody = new httpentity<String>(msgString,headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL,requestBody,String.class);
System.out.println(msgString);
})
.get();
解决方法
此用例无需使用手动确认模式;如果他的休息呼叫正常返回,则容器将确认该消息;如果引发异常,则容器将保留该消息并将其重新发送。
如果您使用手动方式,则Channel
和deliveryTag
邮件标题中的AmqpHeaders.CHANNEL
和AmqpHeaders.DELIVERY_TAG
可用,您可以调用basicAck
或{{ 1}}上的通道(您必须向入站适配器添加错误通道以处理错误。