在 IHostBuilder 托管服务中延迟和重新接收延迟消息

问题描述

如果 Azure 服务总线消息的处理依赖于其他资源,例如一个API或者一个数据库服务,并且这个资源不可用,不调用CompleteMessageAsync()不是一种选择,因为消息会立即再次收到,直到达到Max Delivery Count,然后再放入DLQ。如果某个 API 因维护而停机,我们希望在重试之前稍等片刻。

this question 的其中一个答案包含延迟和接收延迟消息的一般步骤。这比 Microsoft 的文档要好一点,但还不足以让我理解 API 的意图,以及它是如何在托管服务中实现的,该托管服务基本上整天都在 ServiceBusProcessor.StartProcessingAsync 中。

这是我的服务的基本结构:

public class ServiceBusWatcher : IHostedService,Idisposable
{
    public Task StartAsync(CancellationToken stoppingToken)
    {
        ReceiveMessagesAsync();
        return Task.CompletedTask;
    }

    private async void ReceiveMessagesAsync()
    {
        ServiceBusClient client = new ServiceBusClient(connectionString);
        processor = client.CreateProcessor(queueName,new ServiceBusProcessorOptions());
        processor.ProcessMessageAsync += MessageHandler;
        await processor.StartProcessingAsync();
    }

    async Task MessageHandler(ProcessMessageEventArgs args)
    {
         // a dependency is not available that allows me to process a message. so:
         await args.DeferMessageAsync(args.Message);       

一旦消息被延迟,我的理解是处理器将不会再访问它(或者会不会?)。相反,我必须使用 ReceiveDeferredMessageAsync() 来接收它,以及最初接收到的消息的序列号。

就我而言,在重试之前等待几分钟或几小时是有意义的。 这可以通过使用计时器和显式调用 ReceiveDeferredMessageAsync() 的单独服务来完成,而不是使用 ServiceBusProcessor。我还假设延迟消息序列号必须保存在非易失性存储中,以免丢失。

这听起来像是一种可行的方法吗?我不喜欢必须记住它的序列号,以便我可以在以后获取消息。它与首先使用消息队列带来的一切背道而驰。

或者,我可以不推迟,而是发布带有序列号的新“内部”消息,并使用 ScheduledEnqueueTimeUtc 属性延迟接收它。收到此消息后,我可以使用该序列号调用 ReceiveDeferredMessageAsync()获取原始消息。这从表面上看似乎很优雅,但如果依赖项的中断时间更长,消息可能会迅速增加

一个可以在没有其他服务的情况下工作的想法:我可以完成并重新发布消息的有效负载,并将 ScheduledEnqueueTimeUtc 设置为未来的时间,如对 the question I mentioned earlier 的另一个回答中所述。假设这行得通(微软的文档没有提到这个属性是做什么的),看起来简单干净,我喜欢简单。

你是怎么解决这个问题的?有没有更好/首选的方法来平衡低复杂性和高健壮性而不需要大量代码

解决方法

当您知道稍后要检索什么消息并且您的接收者将保存消息序列号以检索延迟消息时,延迟消息就起作用了。如果接收方无法保存消息序列号,则延迟消息是更好的选择。延迟消息意味着将原始消息数据复制到新安排的消息中并完成原始消息。这样,消费者既不必保留消息序列号,也不必启动特定消息的检索。