问题描述
我正在尝试创建一个兔子 mq 通道的队列/池以供重用,他们建议这样做,但没有提供任何实际执行此操作的方法!
我有这个工作,但理想情况下我不想在调用构造函数时填充通道。我想最多创建 9 个频道,只在需要时创建一个。
// COMMENT THIS LINE OUT
GetConnection();
// COMMENT THESE LINE OUT
for (var i = 0; i < MAX_CHANNELS; i++)
{
_channelQueue.Add(_connection.CreateModel());
_channelCount++;
}
// END
当我删除这些时,第一次创建频道和/或连接需要13 秒,但如果我保留代码,则不会发生这种情况!这让我很困惑,我不确定现在发生了什么。
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using RabbitMQ.Client;
namespace Enqueue
{
public class RabbitMqChannelPool
{
private readonly string _hostName;
private const int MAX_CHANNELS = 9;
// Keep track of the number of good channels
private int _channelCount = 0;
private IConnection _connection;
private readonly BlockingCollection<IModel> _channelQueue = new BlockingCollection<IModel>(MAX_CHANNELS);
private Object connectionLock = new Object();
private Object channelLock = new Object();
public RabbitMqChannelPool(string hostName)
{
_hostName = hostName;
// COMMENT THIS LINE OUT
GetConnection();
}
private IConnection GetConnection()
{
lock (connectionLock)
{
if (_connection == null || !_connection.IsOpen)
{
Stopwatch sw = new Stopwatch();
sw.Start();
_connection = CreateNewConnection();
// Having this here makes it run fast,creating on the fly seems to block for ages on the first one
// COMMENT THESE LINE OUT
for (var i = 0; i < MAX_CHANNELS; i++)
{
_channelQueue.Add(_connection.CreateModel());
_channelCount++;
}
// END
sw.Stop();
System.Diagnostics.Debug.WriteLine($"Connection created in { sw.Elapsed.Seconds}.{sw.Elapsed.Milliseconds} seconds");
}
System.Diagnostics.Debug.WriteLine("Returning Connection");
return _connection;
}
}
private IConnection CreateNewConnection()
{
var factory = new ConnectionFactory
{
HostName = _hostName
};
return factory.CreateConnection();
}
/// <summary>
/// Get a channel if at all possible
/// </summary>
/// <returns></returns>
public IModel DequeueChannel()
{
lock (channelLock)
{
System.Diagnostics.Debug.WriteLine("Getting channel from queue");
// if we have not used up the limit then return one
// This should then be put back on the queue after use
if (_channelCount < MAX_CHANNELS)
{
System.Diagnostics.Debug.WriteLine("Returning new channel");
return CreateNewChannel();
}
// Otherwise block until one becomes available
System.Diagnostics.Debug.WriteLine("Waiting for queue...");
IModel channel = _channelQueue.Take(); // this blocks till there is something there
return channel;
}
}
/// <summary>
/// Add the channel back on the queue if it seems fine
/// </summary>
/// <param name="channel"></param>
public void EnqueueChannel(IModel channel)
{
if (channel != null && channel.IsOpen)
{
System.Diagnostics.Debug.WriteLine("Putting channel back on queue");
_channelQueue.Add(channel);
}
else
{
System.Diagnostics.Debug.WriteLine("Channel was bad");
_channelCount--;
}
}
private IModel CreateNewChannel()
{
Stopwatch sw = new Stopwatch();
sw.Start();
var channel = GetConnection().CreateModel();
sw.Stop();
_channelCount++;
System.Diagnostics.Debug.WriteLine($"Channel {_channelCount} created in { sw.Elapsed.Seconds}.{sw.Elapsed.Milliseconds} seconds");
return channel;
}
public int ChannelQueueSize => _channelCount;
}
}
[SetUp]
public void Setup()
{
_rabbitMqChannel = new RabbitMqChannelPool("localhost");
}
[Test]
public void WhenMoreChannelsRequestedThanAvailable_ThenBlockAndWaitTillAvailable()
{
// Arrange
// Act
List<Task> tasks = new List<Task>();
for (var i = 0; i < 20; i++)
{
tasks.Add(Task.Factory.StartNew(DoSomeWork));
}
Task.WaitAll(tasks.ToArray());
// Assert
Assert.AreEqual(9,_rabbitMqChannel.ChannelQueueSize);
}
private void DoSomeWork()
{
var channel = _rabbitMqChannel.DequeueChannel();
Thread.Sleep(100); // simulate doing some work
_rabbitMqChannel.EnqueueChannel(channel);
}
当我在调试单元测试模式下运行它时,我可以看到如下输出,你可以看到它说 13.816 秒,但如果我保留代码让它最初填充队列,那么它运行得很快?
Getting channel from queue
Returning new channel
Connection created in 13.861 seconds
Returning Connection
Channel 1 created in 13.876 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 2 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 3 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 4 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 5 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 6 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 7 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 8 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 9 created in 0.3 seconds
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
解决方法
我决定停止那里的工作并研究 MassTransit,因为听起来它可以为我处理这些事情https://masstransit-project.com/