问题描述
我有一个 BlockingCollection,我从一个线程写入并从另一个线程读取。 生产者线程获取从服务器接收的项目并将它们添加到 BlockingCollection,而读取线程尝试清空 BlockingCollection 并处理它们。
问题我想批量清空队列,因为一一处理会太慢。但是当它不断被写入(数千个项目)时,消费者线程会一直读取它们直到它被清空,这意味着在写入完成之前处理甚至不会开始。
现在,消费者中的处理可以并行完成,所以我一直在想如何去做。
目前我有两个想法:
-
从消费者的 BlockingCollection 中读取一定数量的项目后,启动一个新的并行作业来处理它们,而不是等待完全清空队列然后开始处理。
-
使用多个使用者,并希望它们并行运行,而不是在尝试同时读取 BlockingCollection 时不断相互阻塞。
所以我的问题是关于选项 2 - BlockingCollection 是否针对这种情况进行了内部优化?它会划分读取的区域,还是消费者会为每个项目而争吵?如果是这样,那么选项 1 是否更好?
解决方法
添加另一种选择:(绝不是生产就绪的!)
这利用了 TPL 的数据流,其中 BatchBlock<T>
为我们抽象了批处理。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class HoneyBatcher
{
private const int BATCHSIZE = 10; // Find the size that works best for you.
private readonly BatchBlock<Honey> batchBlock;
private readonly ExecutionDataflowBlockOptions _options =
new ExecutionDataflowBlockOptions()
{
// I'd start with 1,then benchmark if higher number actually benefits.
MaxDegreeOfParallelism = 1,SingleProducerConstrained = true // if so,may micro-optimize throughput
};
// vv Whatever process you want done on a batch
public HoneyBatcher( Action<Honey[]> batchProcessor )
{
// BatchBlock does the batching
// and is the entrypoint to the pipline.
batchBlock = new BatchBlock<Honey>(BATCHSIZE);
// processorBlock processes each batch that batchBlock will produce
// Parallel executions as well as other tweaks can be configured through options.
ActionBlock<Honey[]> processorBlock =
new ActionBlock<Honey[]>(batchProcessor,_options);
// build the pipline
batchBlock.LinkTo(processorBlock);
// item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
}
// Add item individually and have them batched up
// and processed in a pipeline.
public Task<bool> ProcessAsync(Honey item)
{
return batchBlock.SendAsync(item);
// Can also be done with sync API.
}
}
public class Honey
{
// Just a dummy
}
请注意,上面的代码片段只是想法的粗略布局。在生产中,您当然会解决错误处理、完成等问题。
,批量处理项目的自然方法是将它们分批插入到 BlockingCollection
中,而不是稍后分批尝试删除 .换句话说,您可以使用 BlockingCollection<T[]>
而不是 BlockingCollection<T>
。生产者线程可以使用 Queue<T>
:
var queue = new Queue<T>;
while (someCondition)
{
var item = ProduceItem();
queue.Enqueue(item);
if (queue.Count == batchSize)
{
blockingCollection.Add(queue.ToArray());
queue.Clear();
}
}
if (queue.Count > 0)
{
blockingCollection.Add(queue.ToArray());
queue.Clear();
}
blockingCollection.CompleteAdding();
根据情况,您还可以使用一些 LINQ 风格的运算符,例如 Batch
库中的 MoreLinq。
最后,回答您的主要问题,是的,BlockingCollection
类可以出色地处理多个消费者和多个生产者。如果集合为空,所有消费者都会被阻塞,当一个项目到达时,它会被交给一个等待的消费者。