问题描述
我正在使用带有 mq-jms-spring-boot-starter
的 spring boot 创建一个 JMS 侦听器应用程序,该应用程序从队列中读取消息、处理它并将消息转发到另一个队列。
如果出现有害消息,我会尝试生成警报。但是,为了不对同一消息生成多个警报,我正在考虑将 JMSXDeliveryCount
与 BOTHRESH
值进行比较,并在发送到 BOQ 之前的最后一次重新传送中生成警报。
BOTHRESH
和 BOQNAME
是为源队列配置的。
@JmsListener(destination = "${sourceQueue}")
public void processMessages(Message message) {
TextMessage msg = (TextMessage) message;
int boThresh;
int redeliveryCount;
try {
boThresh = message.getIntProperty("<WHAT COMES HERE>");
redeliveryCount = message.getIntProperty("JMSXDeliveryCount");
String processedMessage = this.processMessage(message);
this.forwardMessage("destinationQueue",processedMessage);
} catch (Exception e) {
if (redeliveryCount >= boThresh) {
//generate alert here
}
}
}
我应该如何在此处获取 BOTHRESH
的值?有可能吗?我尝试使用 getPropertyNames()
方法获取所有可用属性,以下是我看到的所有属性。
JMS_IBM_Format
JMS_IBM_PutDate
JMS_IBM_Character_Set
JMSXDeliveryCount
JMS_IBM_MsgType
JMSXUserID
JMS_IBM_Encoding
JMS_IBM_PutTime
JMSXAppID
JMS_IBM_PutAppltype
解决方法
这听起来像是混合了可重试和不可重试的错误处理。 如果您正在跟踪重新交付并需要发送警报,那么您可能不想设置 BOTHRESH 值,而是在您的客户端代码中管理它。
推荐的消费者错误处理模式:
-
如果消息无效(即.. JSON 或 XML 错误),请立即移至 DLQ。消息永远不会提高质量,也没有理由重复重试。
-
如果处理中的“下一步”出现故障(即数据库),则拒绝交付并允许重新交付延迟和回退重试。这也有利于队列中的其他消费者尝试处理消息并消除了一个消费者无法阻止消息的问题。
另外,请考虑使用客户端消费者代码进行监控和警报可能会有问题,因为它结合了不同的功能。如果您的目标是跟踪无效消息,监控 DLQ 通常是一种更好的设计模式,它会从您的消费者代码中删除“监控”代码。
,这样做可以,但代码确实需要对管理通道的管理员访问权限,这对于客户端应用程序来说可能不是最佳选择。
配置
import com.ibm.mq.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.ibm.mq.constants.CMQC;
import java.util.Hashtable;
@Configuration
public class MQConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${ibm.mq.queueManager:QM1}")
public String qMgrName;
@Value("${app.mq.admin.channel:DEV.ADMIN.SVRCONN}")
private String adminChannel;
@Value("${app.mq.host:localhost}")
private String host;
@Value("${app.mq.host.port:1414}")
private int port;
@Value("${app.mq.adminuser:admin}")
private String adminUser;
@Value("${app.mq.adminpassword:passw0rd}")
private String password;
@Bean
public MQQueueManager mqQueueManager() {
try {
Hashtable<String,Object> connectionProperties = new Hashtable<String,Object>();
connectionProperties.put(CMQC.CHANNEL_PROPERTY,adminChannel);
connectionProperties.put(CMQC.HOST_NAME_PROPERTY,host);
connectionProperties.put(CMQC.PORT_PROPERTY,port);
connectionProperties.put(CMQC.USER_ID_PROPERTY,adminUser);
connectionProperties.put(CMQC.PASSWORD_PROPERTY,password);
return new MQQueueManager(qMgrName,connectionProperties);
} catch (MQException e) {
logger.warn("MQException obtaining MQQueueManager");
logger.warn(e.getMessage());
}
return null;
}
}
获取队列的退出阈值
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Runner {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${app.mq.queue:DEV.QUEUE.1}")
private String queueName = "";
private final MQQueueManager mqQueueManager;
Runner(MQQueueManager mqQueueManager) {
this.mqQueueManager = mqQueueManager;
}
@Bean
CommandLineRunner init() {
return (args) -> {
logger.info("Determining Backout threshold");
try {
int[] selectors = {
CMQC.MQIA_BACKOUT_THRESHOLD,CMQC.MQCA_BACKOUT_REQ_Q_NAME };
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[MQC.MQ_Q_NAME_LENGTH];
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue myQueue = mqQueueManager.accessQueue(queueName,openOptions,null,null);
logger.info("Queue Obtained");
MQManagedObject moMyQueue = (MQManagedObject) myQueue;
moMyQueue.inquire(selectors,intAttrs,charAttrs);
int boThresh = intAttrs[0];
String backoutQname = new String(charAttrs);
logger.info("Backout Threshold: " + boThresh);
logger.info("Backout Queue: " + backoutQname);
} catch (MQException e) {
logger.warn("MQException Error obtaining threshold");
logger.warn(e.getMessage());
}
};
}
}