问题描述
|
我是hornetq的新手,请多多包涵。首先让我告诉您我的要求:
我需要一个消息排队中间件,该中间件可以在低延迟和持久性的不同进程之间传递大小约为1k的消息(即它应该在系统崩溃时幸免)。我将有多个进程写入同一队列,并且类似地有多个进程从同一队列读取。
为此,我选择了hornetq,因为它具有持久性传递消息的最佳评分。
我目前使用hornetq v2.2.2Final作为独立服务器。
我能够使用核心api(ClientSession)成功创建持久/非持久队列,并成功将消息发布到队列(ClientProducer)。
同样,我能够使用核心api(ClientConsumer)从队列中读取消息。
问题出在此之后,当客户端读取消息时,消息仍保留在队列中,即,队列中的消息数保持不变。也许我弄错了,但我的印象是,一旦消息被消耗(读取+确认),就将其从队列中删除。再次。
另外,我想告诉我,我已经尝试将非持久性队列与非持久性消息一起使用。但是问题仍然存在。
我正在使用的生产者代码:
public class HQProducer implements Runnable {
private ClientProducer producer;
private boolean killme;
private ClientSession session;
private boolean durableMsg;
public HQProducer(String host,int port,String address,String queueName,boolean deleteQ,boolean durable,boolean durableMsg,int pRate) {
this.durableMsg = durableMsg;
try {
HashMap map = new HashMap();
map.put(\"host\",host);
map.put(\"port\",port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(),map);
ServerLocator locator = hornetqClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
if (queueExists(queueName)) {
if (deleteQ) {
System.out.println(\"Deleting existing queue :: \" + queueName);
session.deleteQueue(queueName);
System.out.println(\"Creating queue :: \" + queueName);
session.createQueue(address,queueName,true);
}
} else {
System.out.println(\"Creating new queue :: \" + queueName);
session.createQueue(address,durable);
}
producer = session.createProducer(SimpleString.toSimpleString(address),pRate);
killme = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE,null,ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killme) {
try {
ClientMessage message = session.createMessage(durableMsg);
message.getBodyBuffer().writeString(\"Hello world\");
producer.send(message);
cnt++;
timediff = ((System.currentTimeMillis() - time) / 1000);
if (timediff >= 1) {
System.out.println(\"Producer tps :: \" + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (hornetqException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE,ex);
}
}
try {
session.close();
} catch (hornetqException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE,ex);
}
}
public void setKillMe(boolean killme) {
this.killme = killme;
}
private boolean queueExists(String qname) {
boolean res = false;
try {
//ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
if (queueQuery.isExists()) {
res = true;
}
} catch (hornetqException ex) {
res = false;
}
return res;
}
}
消费者的代码也是:
public class HQConsumer implements Runnable {
private ClientSession session;
private ClientConsumer consumer;
private boolean killMe;
public HQConsumer(String host,boolean browSEOnly) {
try {
HashMap map = new HashMap();
map.put(\"host\",map);
ServerLocator locator = hornetqClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
session.start();
consumer = session.createConsumer(queueName,\"\",-1,browSEOnly);
killMe = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE,ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killMe) {
try {
ClientMessage msgReceived = consumer.receive();
msgReceived.ackNowledge();
//System.out.println(\"message = \" + msgReceived.getBodyBuffer().readString());
cnt++;
timediff = ((System.currentTimeMillis() - time) / 1000);
if (timediff >= 1) {
System.out.println(\"ConSumer tps :: \" + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (hornetqException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE,ex);
}
}
try {
session.close();
} catch (hornetqException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE,ex);
}
}
public void setKillMe(boolean killMe) {
this.killMe = killMe;
}
}
hornetq服务器配置::
<configuration xmlns=\"urn:hornetq\"
xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"
xsi:schemaLocation=\"urn:hornetq /schema/hornetq-configuration.xsd\">
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name=\"netty\">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key=\"host\" value=\"${hornetq.remoting.netty.host:localhost}\"/>
<param key=\"port\" value=\"${hornetq.remoting.netty.port:5445}\"/>
</connector>
<connector name=\"netty-throughput\">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key=\"host\" value=\"${hornetq.remoting.netty.host:localhost}\"/>
<param key=\"port\" value=\"${hornetq.remoting.netty.batch.port:5455}\"/>
<param key=\"batch-delay\" value=\"50\"/>
</connector>
</connectors>
<acceptors>
<acceptor name=\"netty\">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key=\"host\" value=\"${hornetq.remoting.netty.host:localhost}\"/>
<param key=\"port\" value=\"${hornetq.remoting.netty.port:5445}\"/>
</acceptor>
<acceptor name=\"netty-throughput\">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key=\"host\" value=\"${hornetq.remoting.netty.host:localhost}\"/>
<param key=\"port\" value=\"${hornetq.remoting.netty.batch.port:5455}\"/>
<param key=\"batch-delay\" value=\"50\"/>
<param key=\"direct-deliver\" value=\"false\"/>
</acceptor>
</acceptors>
<security-settings>
<security-setting match=\"#\">
<permission type=\"createNonDurableQueue\" roles=\"guest\"/>
<permission type=\"deleteNonDurableQueue\" roles=\"guest\"/>
<permission type=\"createDurableQueue\" roles=\"guest\"/>
<permission type=\"deleteDurableQueue\" roles=\"guest\"/>
<permission type=\"consume\" roles=\"guest\"/>
<permission type=\"send\" roles=\"guest\"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match=\"#\">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>
解决方法
使用hornetq核心api,您必须明确确认一条消息。我看不到测试中发生了什么。
如果您没有确认,这就是您的邮件被阻止的原因。我需要查看您完整的示例才能为您提供完整的答案。
另外:您应该使用以下方法定义createSession:createSession(true,true,0)
核心API具有批处理ACK的选项。您没有使用事务处理会话,因此只有达到在serverLocator上配置的ackBatchSize时,才将acks发送到服务器。有了此设置,一旦您在消息中调用accept(),任何确认将被发送到服务器。
您当前使用的选项等效于具有特定DUPS_SIZE的JMS DUPS_OK。
(与您反复讨论后,Post编辑了我的初始答案)
, 设置
ackbatchsize
有助于解决此问题。
谢谢您的帮助