JMS配置退避/重试而不阻塞onMessage

问题描述

javax.JMS版本2.0.1

提供程序:ibm.mq v9.0

框架:Java Spring启动

据我所知,onMessage()是异步的。我成功地重试了邮件发送。但是,消息重新发送在消息失败后立即发生。理想情况下,我希望重试以滑动窗口样式进行。 20秒后第一次重试,40秒后第二次重试。

如何在没有 的情况下实现这个 ,我认为它将阻塞整个Java线程,而这根本不是我想要的东西? / p>

代码是这样的

from datetime import datetime
from django.http import HttpResponse
from django.urls import reverse
from django.shortcuts import render,redirect
from django.contrib.auth.forms import AuthenticationForm
from django.contrib import messages,auth
from django.contrib.auth import authenticate,login,logout
from django.contrib import admin
from django.conf import settings
from django.contrib.auth.models import User


 
def about(request):
    return render(request,'pages/admin.html')
def afterlogin(request):
    return render(request,'pages/afterlogin.html')
def login(request):
    if request.method == 'POST': 
        user = auth.authenticate(username=request.POST['username'],password=request.POST['password'])
        if user is not None:
            auth.login(request,user)
            return redirect('/afterlogin')
        else:
            messages.error(request,'Invalid credentials')
            return redirect('login')
            # User is authenticate  
    else:
        return render(request,'pages/login.html')
  

def r(request):
    return redirect('afterlogin')```

解决方法

我建议您在重试逻辑中使用指数补偿,但是您需要使用Delivery Delay功能。

定义一个自定义的JmsTemplate,它将使用消息中的delay属性,您还应该在message属性中添加重试计数,以便可以根据需要延迟20、40、80、160等

public class DelayedJmsTemplate extends JmsTemplate {
    public static String DELAY_PROPERTY_NAME = "deliveryDelay";

    @Override
    protected void doSend(MessageProducer producer,Message message) throws JMSException {
        long delay = -1;
        if (message.propertyExists(DELAY_PROPERTY_NAME)) {
            delay = message.getLongProperty(DELAY_PROPERTY_NAME);
        }
        if (delay >= 0) {
            producer.setDeliveryDelay(delay);
        }
        if (isExplicitQosEnabled()) {
            producer.send(message,getDeliveryMode(),getPriority(),getTimeToLive());
        } else {
            producer.send(message);
        }
    }
}

定义将具有重新加入消息功能的组件,您可以在基本消息侦听器中定义此接口。 handleException方法应该完成入队和计算延迟等所有任务。您可能并不总是对入队感兴趣,在某些情况下,您也会丢弃消息。

您可以在此处看到类似的后处理逻辑 https://github.com/sonus21/rqueue/blob/4c9c5c88f02e5cf0ac4b16129fe5b880411d7afc/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java

@Component
@Sl4j
public class MessageListener {
    private final JmsTemplate jmsTemplate;

    @Autowired
    public MessageListener(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
    @JmsListener(destination = "myDestination")
    public void onMessage(Message message) throws JMSException {
        try {
            // do something
        } catch (Exception e) {
            handleException("myDestination",message,e);
        }
    }

    // Decide whether the message should be ignored due to many retries etc
    private boolean shouldBeIgnored(String destination,Message message) {
        return false;
    }

    // add logic to compute delay
    private long getDelay(String destination,Message message,int deliveryCount) {
        return 100L;
    }

    private void handleException(String destination,Exception e) throws JMSException {
        if (shouldBeIgnored(destination,message)) {
            log.info("destination: {},message: {} is ignored ",destination,e);
            return;
        }
        if (message.propertyExists("JMSXDeliveryCount")) {
            int t = message.getIntProperty("JMSXDeliveryCount");
            long delay = getDelay(destination,t + 1);
            message.setLongProperty(DELAY_PROPERTY_NAME,delay);
            message.setIntProperty("JMSXDeliveryCount",t + 1);
            jmsTemplate.send(destination,session -> message);
        } else {
            // no delivery count,is this the first message or should be ignored?
        }
    }
}