如何使用 Java 将错误消息移动到 Azure 死信队列主题 - 订阅?

问题描述

我需要将我的消息从 azure 主题订阅发送到死信队列,以防在读取和处理来自主题的消息时出现任何错误。所以我尝试测试将消息直接推送到 DLQ。

我的示例代码如下

static void sendMessage()
{
    // create a Service Bus Sender client for the queue 
    ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sender()
            .topicName(topicName)
            
            .buildClient();

    // send one message to the topic
    
    
    senderClient.sendMessage(new ServiceBusMessage("Hello,World!"));    


}
static void resceiveAsync() {
    ServiceBusReceiverAsyncclient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .receiver()
            .topicName(topicName)
            .subscriptionName(subName)
            .buildAsyncclient();

        // receive() operation continuously fetches messages until the subscription is disposed.
        // The stream is infinite,and completes when the subscription or receiver is closed.
        disposable subscription = receiver.receiveMessages().subscribe(message -> {

            System.out.printf("Id: %s%n",message.getMessageId());
            System.out.printf("Contents: %s%n",message.getBody().toString());
        },error -> {
                System.err.println("Error occurred while receiving messages: " + error);
            },() -> {
                System.out.println("Finished receiving messages.");
            });

        // Continue application processing. When you are finished receiving messages,dispose of the subscription.
        subscription.dispose();

        // When you are done using the receiver,dispose of it.
        receiver.close();
    
    
    
}

我尝试获取死信队列路径

    String dlq = EntityNameHelper.formatDeadLetterPath(topicName);

我得到了死信队列的路径,例如 = "mytopic/$deadletterqueue"

但是在将路径作为主题名称传递时它不起作用。它抛出一个实体主题未找到异常。

有没有人可以给我建议

参考: How to move error message to Azure dead letter queue using Java?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#moving-messages-to-the-dlq

How to push the failure messages to Azure service bus Dead Letter Queue in Spring Boot Java?

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-topics-subscriptions-legacy#receive-messages-from-a-subscription

解决方法

您可能知道,如果在处理过程中抛出异常,并且超过最大传递计数,则消息将自动移至死信队列。如果您想显式地将消息移动到 DLQ,您也可以这样做。一个常见的情况是,如果您知道消息因其内容而永远不会成功。

您不能直接向 DLQ 发送消息,因为那样的话系统中就会有两条消息。您需要对父实体调用特殊操作。此外,<topic path>/$deadletterqueue 不起作用,因为这将是所有订阅的 DLQ。正确的实体路径是这样构建的:

<queue path>/$deadletterqueue
<topic path>/Subscriptions/<subscription path>/$deadletterqueue

https://github.com/Azure/azure-service-bus/blob/master/samples/Java/azure-servicebus/DeadletterQueue/src/main/java/com/microsoft/azure/servicebus/samples/deadletterqueue/DeadletterQueue.java

此示例代码适用于队列,但您应该能够很容易地将其调整到主题:

    // register the RegisterMessageHandler callback
    receiver.registerMessageHandler(
            new IMessageHandler() {
                // callback invoked when the message handler loop has obtained a message
                public CompletableFuture<Void> onMessageAsync(IMessage message) {
                    // receives message is passed to callback
                    if (message.getLabel() != null &&
                            message.getContentType() != null &&
                            message.getLabel().contentEquals("Scientist") &&
                            message.getContentType().contentEquals("application/json")) {

                        // ...
                    } else {
                        return receiver.deadLetterAsync(message.getLockToken());
                    }
                    return receiver.completeAsync(message.getLockToken());
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable,ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
            },// 1 concurrent call,messages are auto-completed,auto-renew duration
            new MessageHandlerOptions(1,false,Duration.ofMinutes(1)),executorService);

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...