如何在RabbitMQ外部发送确认消费者?

问题描述

我有一个应用程序,它向 RMQ 代理发送消息,如下所示:

 var connectionFactory = new ConnectionFactory()
            {
               HostName = "localhost"
            };

            using (var connection = connectionFactory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("demo",durable:true,exclusive:false,autoDelete:false,arguments:null);

                    Console.WriteLine("Click enters to send random case Id");
                    do
                    {
                        Console.ReadLine();
                        var message = new {CaseId = new Random().Next()};
                        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                        channel.Basicpublish("","demo",null,body);
                        Console.WriteLine("Successfully send message.");
                    } while (true);

                }
            }

成功发送消息。

还有一个应用叫做消费者应用。

代码如下:

private void InitiaterabbitMq()
        {
            var connectionFactory = new ConnectionFactory()
            {
                HostName = "localhost"
            };

            var connection = connectionFactory.CreateConnection();
            var channel = connection.CreateModel();

            MessageHandler messageReceiver = new MessageHandler(channel);
            channel.BasicConsume("demo",false,messageReceiver);

        }

消息处理程序是:

public class MessageHandler : DefaultBasicConsumer
    {
        private readonly IModel _channel;
        public MessageHandler(IModel channel)
        {
            _channel = channel;
        }

        public override async void HandleBasicDeliver(string consumerTag,ulong deliveryTag,bool redelivered,string exchange,string routingKey,IBasicProperties properties,ReadOnlyMemory<byte> body)
        {
            var message = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body.ToArray()));

            _processor.process(message);
        }


    }

这是另一个类的进程方法parth:

client.BeginTransfer(transfer,HandleUploadProgressChanged,HandleUploadComplete,HandleUploadInterrupted,HandleUploadCancelled,3600,UploadFilesFinishedCallback);

一旦开始传输完成。它调用UploadFilesFinishedCallback 。我想在这方法中承认。我该怎么做?

解决方法

这是您确认消息的方式:

channel.BasicAck(deliveryTag,false);

所以这意味着你的函数“UploadFilesFinishedCallback”必须有deliveryTag

==> 这意味着您的“流程”功能也必须具有 deliveryTag(目前仅获取消息内容)

解决办法: 向函数“process”和函数“UploadFilesFinishedCallback”添加新参数“deliveryTag”

你可以像这样在回调中使用它:

client.BeginTransfer(transfer,HandleUploadProgressChanged,HandleUploadComplete,HandleUploadInterrupted,HandleUploadCancelled,3600,() => { UploadFilesFinishedCallback(deliveryTag) });

(取决于回调函数的签名)