>我一次最多只能获得256条消息,甚至可以根据消息大小随机获取.
>即使我查看有多少消息在等待,我也不知道有多少RequestBatch调用,因为我不知道每次调用会给我多少消息.由于消息将继续进入,我不能继续发出请求,直到它为空,因为它永远不会是空的.
我决定只使用比浪费偷看更便宜的消息监听器,并且会给我更多的控制权.
Basically I am trying to let a set number of messages build up and
then process them at once. I use a timer to force a delay but I need
to be able to queue my items as they come in.
基于我的计时器要求,似乎阻塞集合不是一个好选项,所以我试图使用ConcurrentBag.
var batchingQueue = new ConcurrentBag<brokeredMessage>(); myQueueClient.OnMessage((m) => { Console.WriteLine("queueing message"); batchingQueue.Add(m); }); while (true) { var sw = WaitableStopwatch.StartNew(); brokeredMessage msg; while (batchingQueue.TryTake(out msg)) // <== Object is already disposed { ...do this until I have a thousand ready to be written to DB in batch Console.WriteLine("Completing message"); msg.Complete(); // <== ERRORS HERE } sw.Wait(MINIMUM_DELAY); }
However as soon as I access the message outside of the OnMessage
pipeline it shows the brokeredMessage as already being disposed.
我认为这必须是OnMessage的一些自动行为,我没有看到任何方式对消息做任何事情,除了立即处理它我不想做.
解决方法
var batchingQueue = new BlockingCollection<brokeredMessage>(); myQueueClient.OnMessage((m) => { Console.WriteLine("queueing message"); batchingQueue.Add(m); });
和你的消费者线程:
foreach (var msg in batchingQueue.GetConsumingEnumerable()) { Console.WriteLine("Completing message"); msg.Complete(); }
GetConsumingEnumerable返回一个迭代器,该迭代器使用队列中的项目,直到设置了IsCompleted属性并且队列为空.如果队列为空但IsCompleted为False,则会执行非忙等待下一个项目.
要取消使用者线程(即关闭程序),您将停止向队列添加内容并让主线程调用batchingQueue.CompleteAdding.使用者将清空队列,看到IsCompleted属性为True,然后退出.
在这里使用BlockingCollection比ConcurrentBag或ConcurrentQueue更好,因为BlockingCollection接口更容易使用.特别是,使用GetConsumingEnumerable可以使您不必担心检查计数或进行忙等待(轮询循环).它只是有效.
另请注意,ConcurrentBag有一些相当奇怪的删除行为.特别是,删除项目的顺序根据哪个线程删除项目而不同.创建包的线程以与其他线程不同的顺序删除项目.有关详细信息,请参阅Using the ConcurrentBag Collection.
您还没有说明为什么要在输入中批量处理项目.除非有一个压倒一切的性能原因,否则使用该批处理逻辑使代码复杂化似乎并不是一个特别好的主意.
如果你想对数据库进行批量写入,那么我建议使用一个简单的List< T>缓冲项目.如果必须在将项目写入数据库之前对其进行处理,请使用上面显示的技术来处理它们.然后,而是直接写入数据库,将项目添加到列表中.当列表获得1,000个项目或经过一定时间后,分配新列表并启动任务以将旧列表写入数据库.像这样:
// at class scope // Flush every 5 minutes. private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5); private const int MaxBufferItems = 1000; // Create a timer for the buffer flush. System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush,FlushDelay.TotalMilliseconds,Timeout.Infinite); // A lock for the list. Unless you're getting hundreds of thousands // of items per second,this will not be a performance problem. object _listLock = new Object(); List<brokeredMessage> _recordBuffer = new List<brokeredMessage>();
然后,在您的消费者中:
foreach (var msg in batchingQueue.GetConsumingEnumerable()) { // process the message Console.WriteLine("Completing message"); msg.Complete(); lock (_listLock) { _recordBuffer.Add(msg); if (_recordBuffer.Count >= MaxBufferItems) { // Stop the timer _flushTimer.Change(Timeout.Infinite,Timeout.Infinite); // Save the old list and allocate a new one var myList = _recordBuffer; _recordBuffer = new List<brokeredMessage>(); // Start a task to write to the database Task.Factory.StartNew(() => FlushBuffer(myList)); // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite); } } } private void TimedFlush() { bool lockTaken = false; List<brokeredMessage> myList = null; try { if (Monitor.TryEnter(_listLock,out lockTaken)) { // Save the old list and allocate a new one myList = _recordBuffer; _recordBuffer = new List<brokeredMessage>(); } } finally { if (lockTaken) { Monitor.Exit(_listLock); } } if (myList != null) { FlushBuffer(myList); } // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite); }
这里的想法是,您将旧列表排除在外,分配新列表以便继续处理,然后将旧列表的项目写入数据库.锁是为了防止计时器和记录计数器相互踩踏.没有锁定,事情可能会在一段时间内正常工作,然后你会在不可预测的时间发生奇怪的崩溃.
我喜欢这种设计,因为它消除了消费者的轮询.我唯一不喜欢的是消费者必须知道计时器(即它必须停止然后重新启动计时器).稍微考虑一下,我可以消除这个要求.但它的编写方式很好.