问题描述
我正在尝试将ActiveMQ与NMS(C#)使用者一起使用来获取消息,进行一些处理,然后通过HttpClient.PostAsync()将内容发送到Web服务,所有这些都在Windows服务中运行(通过Topshelf)。
我正在与之通信的下游系统非常敏感,并且我使用单独的确认,因此我可以通过确认或触发自定义重试(即不是session.recover)来检查响应并采取相应的措施。
由于下游系统不可靠,因此我一直在尝试几种不同的方法来降低消费者的吞吐量。我以为可以通过转换为同步并使用预取来完成此操作,但似乎没有用。
我的理解是,对于异步使用方,永远不会达到预取“限制”,但是使用同步方法,预取队列只会在确认消息后被吞噬,这意味着我可以调整侦听器以一定速率传递消息下游组件可以处理的。
在队列中加载了100条消息,然后使用侦听器(即异步地)启动我的代码,然后我可以成功记录100 msg的信息通过了。 当我将其更改为使用consumer.Receive()(或ReceiveNowait)时,我再也不会收到消息。
以下是我为同步使用者尝试的内容的摘要,其中包含async选项,但已注释掉:
public Worker(LogWriter logger,ServiceConfiguration config,IConnectionFactory connectionFactory,IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
session = connection.CreateSession(AckNowledgementMode.IndividualAckNowledge);
queue = session.GetQueue(configuration.JmsConfig.sourceQueueName);
consumer = session.CreateConsumer(queue);
// Asynchronous
//consumer.Listener += new MessageListener(OnMessage);
// Synchronous
var message = consumer.Receive(TimeSpan.FromSeconds(5));
while (true)
{
if (!Equals(message,null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}",message.Properties.GetInt("count"),message.Properties.GetInt("NMSXDeliveryCount"));
message.AckNowledge();
}
解决方法
我相信您需要在Start()
上致电connection
,例如:
connection.Start();
呼叫Start()
表示您希望消息流。
还值得注意的是,除了从while(true)
抛出异常之外,没有其他方法可以打破您的OnMessage
循环。