使用 JMS 为 Azure 服务总线安排消息

问题描述

我想使用 JMS 向 Azure 服务总线发送预定消息。 我的代码基于 org.apache.qpid.jms.message.JmsMessage。我为给定的问题找到了一种解决方案,但它使用具有 org.apache.qpid.proton.message.Message.getMessageAnnotations(),它允许编辑消息注释并添加一些由 Azure 服务总线正确识别和处理的属性。我的消息实现缺少该方法

我在 node.js 中的官方文档和实现中发现,要使用 Azure 服务总线安排消息,您需要发送具有有效 json 的标头 brokerProperties/brokerProperties。 其他标头/属性将被标记Customer properties 并被 Azure 服务总线忽略。

official azure docs about JMS 表示 JMS API 不正式支持设置 ScheduledEnqueueTimeUtc。但是可以通过设置属性手动实现。

所以当我向队列发送消息时,我可以在 lambda 中对它进行后期处理并设置一些属性

jmstemplate.convertAndSend(queue,payload,message -> {
    var date = Date.from(zoneddatetime.Now(ZoneId.of("UTC")).plus(delay,ChronoUnit.MILLIS).toInstant());
    var brokerProps = Map.of("ScheduledEnqueueTimeUtc",date.toGMTString());
    message.setStringProperty(
        "brokerProperties",objectMapper.writeValueAsstring(brokerProps)
    );
    return message;
});

而且它不起作用。消息到达队列,但是当我尝试在 Azure 上的服务总线资源管理器上查看它时,它会在浏览器控制台中引发错误并且操作永远持续。我想设置该属性 brokerProperties 对服务总线有一些影响。 我还尝试发送一个带有日期作为字符串的地图(Azure 使用的日期格式),如 "ScheduledEnqueueTimeUtc","Thu,25 Mar 2021 12:54:00 GMT",但它也被服务总线识别为错误(窥视永远持续,错误在浏览器控制台被抛出)。

我尝试设置字符串属性,如 x-opt-scheduled-enqueue-timex-ms-scheduled-enqueue-time,我在 SO 的其他线程中找到了这些属性,但它们都不适用于我的示例。

我看到微软提供了一些 Java 库来与 Azure 服务总线通信,但我需要在我的代码中保持与云提供商的独立性,并且不包含任何额外的库。

是否有使用包 org.apache.qpid.jms.message.JmsMessage 中的 JMS 消息实现为 Azure 服务总线设置 brokerProperties 的示例?

解决方法

我的团队目前面临同样的问题。

我们发现在 MessageAnnotationsMap 中设置了 ScheduledEnqueueTimeUtc 属性。不幸的是,JMS 使用的 org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade 已将 getter 和 setter 设置为包私有。但我们发现您可以使用 setTracingAnnotation(String key,Object value) 方法。

示例:

public void sendDelayedMessage() {
    final var now = ZonedDateTime.now();
    jmsTemplate.send("test-queue",session -> {
        final var tenMinutesFromNow = now.plusMinutes(10);
        final var textMessage = session.createTextMessage("Hello Service Bus!");
        ((JmsTextMessage) textMessage).getFacade().setTracingAnnotation("x-opt-scheduled-enqueue-time",Date.from(tenMinutesFromNow.toInstant()));
        return textMessage;
    });
    log.info("Sent at: " + now);
}

证明: enter image description here

非常感谢我的teammate!!