问题描述
我需要将我的消息从 azure 主题订阅发送到死信队列,以防在读取和处理来自主题的消息时出现任何错误。所以我尝试测试将消息直接推送到 DLQ。
我的示例代码如下
static void sendMessage()
{
// create a Service Bus Sender client for the queue
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sender()
.topicName(topicName)
.buildClient();
// send one message to the topic
senderClient.sendMessage(new ServiceBusMessage("Hello,World!"));
}
static void resceiveAsync() {
ServiceBusReceiverAsyncclient receiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.topicName(topicName)
.subscriptionName(subName)
.buildAsyncclient();
// receive() operation continuously fetches messages until the subscription is disposed.
// The stream is infinite,and completes when the subscription or receiver is closed.
disposable subscription = receiver.receiveMessages().subscribe(message -> {
System.out.printf("Id: %s%n",message.getMessageId());
System.out.printf("Contents: %s%n",message.getBody().toString());
},error -> {
System.err.println("Error occurred while receiving messages: " + error);
},() -> {
System.out.println("Finished receiving messages.");
});
// Continue application processing. When you are finished receiving messages,dispose of the subscription.
subscription.dispose();
// When you are done using the receiver,dispose of it.
receiver.close();
}
我尝试获取死信队列路径
String dlq = EntityNameHelper.formatDeadLetterPath(topicName);
我得到了死信队列的路径,例如 = "mytopic/$deadletterqueue"
但是在将路径作为主题名称传递时它不起作用。它抛出一个实体主题未找到异常。
有没有人可以给我建议
参考: How to move error message to Azure dead letter queue using Java?
How to push the failure messages to Azure service bus Dead Letter Queue in Spring Boot Java?
解决方法
您可能知道,如果在处理过程中抛出异常,并且超过最大传递计数,则消息将自动移至死信队列。如果您想显式地将消息移动到 DLQ,您也可以这样做。一个常见的情况是,如果您知道消息因其内容而永远不会成功。
您不能直接向 DLQ 发送新消息,因为那样的话系统中就会有两条消息。您需要对父实体调用特殊操作。此外,<topic path>/$deadletterqueue
不起作用,因为这将是所有订阅的 DLQ。正确的实体路径是这样构建的:
<queue path>/$deadletterqueue
<topic path>/Subscriptions/<subscription path>/$deadletterqueue
此示例代码适用于队列,但您应该能够很容易地将其调整到主题:
// register the RegisterMessageHandler callback
receiver.registerMessageHandler(
new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {
// ...
} else {
return receiver.deadLetterAsync(message.getLockToken());
}
return receiver.completeAsync(message.getLockToken());
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable,ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},// 1 concurrent call,messages are auto-completed,auto-renew duration
new MessageHandlerOptions(1,false,Duration.ofMinutes(1)),executorService);