C# RabbitMQ 池/队列方法

问题描述

我正在尝试创建一个兔子 mq 通道的队列/池以供重用,他们建议这样做,但没有提供任何实际执行此操作的方法

我有这个工作,但理想情况下我不想在调用构造函数时填充通道。我想最多创建 9 个频道,只在需要时创建一个

我想从显示的完整示例中删除以下内容

// COMMENT THIS LINE OUT
GetConnection();

// COMMENT THESE LINE OUT
for (var i = 0; i < MAX_CHANNELS; i++)
{
    _channelQueue.Add(_connection.CreateModel());
     _channelCount++;
}
// END

当我删除这些时,第一次创建频道和/或连接需要13 秒,但如果我保留代码,则不会发生这种情况!这让我很困惑,我不确定现在发生了什么。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using RabbitMQ.Client;

namespace Enqueue
{
    public class RabbitMqChannelPool
    {
        private readonly string _hostName;

        private const int MAX_CHANNELS = 9;
        
        // Keep track of the number of good channels
        private int _channelCount = 0;

        private IConnection _connection;
        
        private readonly BlockingCollection<IModel> _channelQueue = new BlockingCollection<IModel>(MAX_CHANNELS);

        private Object connectionLock = new Object();
        private Object channelLock = new Object();

        public RabbitMqChannelPool(string hostName)
        {
            _hostName = hostName;
            // COMMENT THIS LINE OUT
            GetConnection();
        }
        
        private IConnection GetConnection()
        {
            lock (connectionLock)
            {
                if (_connection == null || !_connection.IsOpen)
                {
                    Stopwatch sw = new Stopwatch();
                    sw.Start();
                    _connection = CreateNewConnection();
                    // Having this here makes it run fast,creating on the fly seems to block for ages on the first one
                    // COMMENT THESE LINE OUT
                    for (var i = 0; i < MAX_CHANNELS; i++)
                    {
                        _channelQueue.Add(_connection.CreateModel());
                        _channelCount++;
                    }
                    // END
                    sw.Stop();
                    System.Diagnostics.Debug.WriteLine($"Connection created in { sw.Elapsed.Seconds}.{sw.Elapsed.Milliseconds} seconds");
                }
                System.Diagnostics.Debug.WriteLine("Returning Connection");
                return _connection;
            }
        }

        private IConnection CreateNewConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = _hostName
            };

            return factory.CreateConnection();
        }

        /// <summary>
        /// Get a channel if at all possible
        /// </summary>
        /// <returns></returns>
        public IModel DequeueChannel()
        {
            lock (channelLock)
            {
                System.Diagnostics.Debug.WriteLine("Getting channel from queue");
                // if we have not used up the limit then return one
                // This should then be put back on the queue after use
                if (_channelCount < MAX_CHANNELS)
                {
                    System.Diagnostics.Debug.WriteLine("Returning new channel");
                    return CreateNewChannel();
                }

                // Otherwise block until one becomes available
                System.Diagnostics.Debug.WriteLine("Waiting for queue...");
                IModel channel = _channelQueue.Take(); // this blocks till there is something there
                return channel;
            }
        }

        /// <summary>
        /// Add the channel back on the queue if it seems fine
        /// </summary>
        /// <param name="channel"></param>
        public void EnqueueChannel(IModel channel)
        {
            if (channel != null && channel.IsOpen)
            {
                System.Diagnostics.Debug.WriteLine("Putting channel back on queue");
                _channelQueue.Add(channel);
            }
            else
            {
                System.Diagnostics.Debug.WriteLine("Channel was bad");
                _channelCount--;
            }
        }

        private IModel CreateNewChannel()
        {
            Stopwatch sw = new Stopwatch();
            sw.Start();
            var channel = GetConnection().CreateModel();
            sw.Stop();
            _channelCount++;
            System.Diagnostics.Debug.WriteLine($"Channel {_channelCount} created in { sw.Elapsed.Seconds}.{sw.Elapsed.Milliseconds} seconds");
            return channel;
        }

        public int ChannelQueueSize => _channelCount;
    }
}

我有这个单元测试来尝试模拟它被快速连续调用

[SetUp]
public void Setup()
{
    _rabbitMqChannel = new RabbitMqChannelPool("localhost");
}

[Test]
public void WhenMoreChannelsRequestedThanAvailable_ThenBlockAndWaitTillAvailable()
{
    // Arrange
    // Act

    List<Task> tasks = new List<Task>();

    for (var i = 0; i < 20; i++)
    {
        tasks.Add(Task.Factory.StartNew(DoSomeWork));
    }

    Task.WaitAll(tasks.ToArray());

    // Assert

    Assert.AreEqual(9,_rabbitMqChannel.ChannelQueueSize);
}

private void DoSomeWork()
{
    var channel = _rabbitMqChannel.DequeueChannel();
    Thread.Sleep(100); // simulate doing some work
    _rabbitMqChannel.EnqueueChannel(channel);
}

当我在调试单元测试模式下运行它时,我可以看到如下输出,你可以看到它说 13.816 秒,但如果我保留代码让它最初填充队列,那么它运行得很快?

Getting channel from queue
Returning new channel
Connection created in 13.861 seconds
Returning Connection
Channel 1 created in 13.876 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 2 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 3 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 4 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 5 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 6 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 7 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 8 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 9 created in 0.3 seconds
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue

解决方法

我决定停止那里的工作并研究 MassTransit,因为听起来它可以为我处理这些事情https://masstransit-project.com/

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...