问题描述
我在阅读 IBM MQ (c# + IBM.xms + ibmcom/mq:latest) 上的第二条消息时遇到问题。 使用默认队列“DEV.QUEUE.1”,它只会在第一条消息到达后连接停止并再次启动时继续监听消息(conn.Stop()/conn.Start())。如果侦听器运行时消息已经在队列中,则所有消息都被立即消费。
我用的是 Docker,这是 MQ 版本信息:(尝试了多个旧版本)
bash-4.4$ dspmqver
Name: IBM MQ
Version: 9.2.2.0
Level: p922-L210310.DE
BuildType: IKAP - (Production)
Platform: IBM MQ for Linux (x86-64 platform)
Mode: 64-bit
O/S: Linux 4.19.128-microsoft-standard
O/S Details: Red Hat Enterprise Linux 8.3 (Ootpa)
InstName: Installation1
InstDesc: IBM MQ V9.2.2.0 (Unzipped)
Primary: N/A
InstPath: /opt/mqm
DataPath: /mnt/mqm/data
MaxCmdLevel: 922
LicenseType: Developer
使用的代码:
using IBM.xms;
...
public IConnection conn;
static void Main(string[] args)
{
Program app = new Program();
app.Setup();
Console.ReadLine();
}
public void Setup()
{
xmsFactoryFactory xff = xmsFactoryFactory.GetInstance(xmsC.CT_WMQ);
IConnectionFactory cf = xff.CreateConnectionFactory();
cf.SetStringProperty(xmsC.WMQ_HOST_NAME,"localhost");
cf.SetIntProperty(xmsC.WMQ_PORT,1414);// 9443);
cf.SetStringProperty(xmsC.WMQ_CHANNEL,"DEV.ADMIN.SVRCONN");
cf.SetIntProperty(xmsC.WMQ_CONNECTION_MODE,xmsC.WMQ_CM_CLIENT);
cf.SetStringProperty(xmsC.WMQ_QUEUE_MANAGER,"QM1");
cf.SetIntProperty(xmsC.WMQ_broKER_VERSION,xmsC.WMQ_broKER_V1);
cf.SetStringProperty(xmsC.USERID,"admin");
cf.SetStringProperty(xmsC.PASSWORD,"passw0rd");
conn = cf.CreateConnection();
Console.WriteLine("connection created");
ISession sess = conn.CreateSession(false,AckNowledgeMode.AutoAckNowledge);
IDestination dest = sess.CreateQueue("DEV.QUEUE.1");
IMessageConsumer consumer = sess.CreateConsumer(dest);
MessageListener ml = new MessageListener(OnMessage);
consumer.MessageListener = ml;
conn.Start();
Console.WriteLine("Consumer started");
}
private void OnMessage(IMessage msg)
{
ITextMessage textMsg = (ITextMessage)msg;
Console.WriteLine("Got a message: " + textMsg.Text);
conn.Stop(); // MUST BE CHANGED - for some reason,new messages are not being updated,so connection needs to be restarted
conn.Start();
}
谢谢
解决方法
这就是我的做法。您需要询问下一条消息,完成后将其从 MQ 中删除。
public string ConnectMQ(string strQueueManagerName,string strQueueName,string strChannelInfo)
{
//
QueueManagerName = strQueueManagerName;
QueueName = strQueueName;
ChannelInfo = strChannelInfo;
//
char[] separator = { '/' };
string[] ChannelParams;
ChannelParams = ChannelInfo.Split(separator);
channelName = ChannelParams[0];
transportType = ChannelParams[1];
connectionName = ChannelParams[2];
// connectionName = "";
String strReturn = "";
try
{
queueManager = new MQQueueManager(QueueManagerName,channelName,connectionName);
strReturn = "Connected Successfully";
}
public String BrowseMsg(String position,MQQueue queue)
{
String strReturn = "";
try
{
queueMessage = new MQMessage();
queueMessage.Format = MQC.MQFMT_STRING;
queueGetMessageOptions = new MQGetMessageOptions();
if (position.Equals("First"))
{
queueGetMessageOptions.Options = IBM.WMQ.MQC.MQGMO_BROWSE_FIRST;
}
if (position.Equals("Next"))
{
queueGetMessageOptions.Options = IBM.WMQ.MQC.MQGMO_BROWSE_NEXT;
}
if (position.Equals("Remove"))
{
queueGetMessageOptions.Options = IBM.WMQ.MQC.MQGMO_MSG_UNDER_CURSOR;
}
queue.Get(queueMessage,queueGetMessageOptions);
strReturn = queueMessage.ReadString(queueMessage.MessageLength);
queueMessage.MessageId = MQC.MQMI_NONE;
queueMessage.CorrelationId = MQC.MQCI_NONE;
// Optional,remove data from the message
queueMessage.ClearMessage();
}
public void init()
{
try
{
QueueManagerName = ConfigurationManager.AppSettings["QueueManagerName"].ToString();
QueueName = ConfigurationManager.AppSettings["QueueName"].ToString();
ChannelInfo = ConfigurationManager.AppSettings["ChannelInfo"].ToString();
String strConnectionSuccess = String.Empty;
strConnectionSuccess = "false";
String strMQResponse = String.Empty;
XmlDocument xmlMQRequestResponse = new XmlDocument();
strConnectionSuccess = ConnectMQ(QueueManagerName,QueueName,ChannelInfo);
if (!strConnectionSuccess.Equals("Connected Successfully"))
{
return;
}
int count = 0;
queue = queueManager.AccessQueue(QueueName,MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_BROWSE + MQC.MQOO_FAIL_IF_QUIESCING);
if (strMQResponse.Equals("MQRC_GET_INHIBITED") || strMQResponse.Equals("Exception : MQRC_GET_INHIBITED"))
{
log4net.LogManager.GetLogger("RollingFileAppender").Debug(strMQResponse);
}
while (!strMQResponse.Equals("Exception : MQRC_NO_MSG_AVAILABLE") && !strMQResponse.Equals("MQRC_GET_INHIBITED") && !strMQResponse.Equals("Exception : MQRC_GET_INHIBITED"))
{
if (count == 0)
{
strMQResponse = BrowseMsg("First",queue);
}
else
{
strMQResponse = BrowseMsg("Next",queue);
}
if (strMQResponse.Equals("Exception : MQRC_NO_MSG_AVAILABLE"))
{
continue;
}
Boolean boosuccess = new Boolean();
// if success then do something with the data here
if (boosuccess.Equals(true))
{
BrowseMsg("Remove",queue);
}
count++;
}
}