问题描述
我正在使用ActionBlock
同时处理队列。
这里的一个陷阱是,当处理队列中的一个项目时,我可能要等到队列中另一个项目的处理满足依赖性为止。
我认为我应该能够使用 TPL DataFlow 库做到这一点,并具有链接,延迟和释放延迟的功能,但是我不确定要使用什么构造。
使用伪代码:
public class Item
{
public string Name { get; set; }
public List<string> DependsOn = new List<string>();
}
ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o => {
if (!HasActionBlockProcessedAllDependencies(o.DependsOn))
{
// enqueue a callback when ALL dependencies have been completed
}
else
{
DoWork(o);
}
},new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
});
var items = new[]
{
new Item { Name = "Apple",DependsOn = { "Pear" } },new Item { Name = "Pear" }
}
解决方法
我不确定这是否会对您有所帮助,但这是一个自定义DependencyTransformBlock
类,它知道接收到的项目之间的依赖关系,并且仅在成功处理依赖项之后才处理每个依赖项。该自定义块支持普通TransformBlock
的所有内置功能,但EnsureOrdered
选项除外。
此类的构造函数接受一个Func<TInput,TKey>
lambda来检索每个项的键,以及一个Func<TInput,IReadOnlyCollection<TKey>>
lambda来检索其依赖项。密钥应该是唯一的。如果发现重复的密钥,则该块将失败。
如果项目之间存在循环依赖关系,则受影响的项目将保持未处理状态。属性TInput[] Unprocessed
允许在块完成后检索未处理的项目。如果未提供任何依赖项,则该条目也可以保持未处理状态。
public class DependencyTransformBlock<TInput,TKey,TOutput> :
ITargetBlock<TInput>,ISourceBlock<TOutput>
{
private readonly ITargetBlock<TInput> _inputBlock;
private readonly IPropagatorBlock<Item,TOutput> _transformBlock;
private readonly object _locker = new object();
private readonly Dictionary<TKey,Item> _items;
private int _pendingCount = 1;
// The initial 1 represents the completion of the _inputBlock
private class Item
{
public TKey Key;
public TInput Input;
public bool HasInput;
public bool IsCompleted;
public HashSet<Item> Dependencies;
public HashSet<Item> Dependents;
public Item(TKey key) => Key = key;
}
public DependencyTransformBlock(
Func<TInput,Task<TOutput>> transform,Func<TInput,TKey> keySelector,IReadOnlyCollection<TKey>> dependenciesSelector,ExecutionDataflowBlockOptions dataflowBlockOptions = null,IEqualityComparer<TKey> keyComparer = null)
{
if (transform == null)
throw new ArgumentNullException(nameof(transform));
if (keySelector == null)
throw new ArgumentNullException(nameof(keySelector));
if (dependenciesSelector == null)
throw new ArgumentNullException(nameof(dependenciesSelector));
dataflowBlockOptions =
dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;
_items = new Dictionary<TKey,Item>(keyComparer);
_inputBlock = new ActionBlock<TInput>(async input =>
{
var key = keySelector(input);
var dependencyKeys = dependenciesSelector(input);
bool isReadyForProcessing = true;
Item item;
lock (_locker)
{
if (!_items.TryGetValue(key,out item))
{
item = new Item(key);
_items.Add(key,item);
}
if (item.HasInput)
throw new InvalidOperationException($"Duplicate key ({key}).");
item.Input = input;
item.HasInput = true;
if (dependencyKeys != null && dependencyKeys.Count > 0)
{
item.Dependencies = new HashSet<Item>();
foreach (var dependencyKey in dependencyKeys)
{
if (!_items.TryGetValue(dependencyKey,out var dependency))
{
dependency = new Item(dependencyKey);
_items.Add(dependencyKey,dependency);
}
if (!dependency.IsCompleted)
{
item.Dependencies.Add(dependency);
if (dependency.Dependents == null)
dependency.Dependents = new HashSet<Item>();
dependency.Dependents.Add(item);
}
}
isReadyForProcessing = item.Dependencies.Count == 0;
}
if (isReadyForProcessing) _pendingCount++;
}
if (isReadyForProcessing)
{
await _transformBlock.SendAsync(item);
}
},new ExecutionDataflowBlockOptions()
{
CancellationToken = dataflowBlockOptions.CancellationToken,BoundedCapacity = 1
});
var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
{
CancellationToken = dataflowBlockOptions.CancellationToken,BoundedCapacity = DataflowBlockOptions.Unbounded
});
_transformBlock = new TransformBlock<Item,TOutput>(async item =>
{
try
{
TInput input;
lock (_locker)
{
Debug.Assert(item.HasInput && !item.IsCompleted);
input = item.Input;
}
var result = await transform(input).ConfigureAwait(false);
lock (_locker)
{
item.IsCompleted = true;
if (item.Dependents != null)
{
foreach (var dependent in item.Dependents)
{
Debug.Assert(dependent.Dependencies != null);
var removed = dependent.Dependencies.Remove(item);
Debug.Assert(removed);
if (dependent.HasInput
&& dependent.Dependencies.Count == 0)
{
middleBuffer.Post(dependent);
_pendingCount++;
}
}
}
item.Input = default; // Cleanup
item.Dependencies = null;
item.Dependents = null;
}
return result;
}
finally
{
lock (_locker)
{
_pendingCount--;
if (_pendingCount == 0) middleBuffer.Complete();
}
}
},dataflowBlockOptions);
middleBuffer.LinkTo(_transformBlock);
PropagateCompletion(_inputBlock,middleBuffer,condition: () => { lock (_locker) return --_pendingCount == 0; });
PropagateCompletion(middleBuffer,_transformBlock);
PropagateFailure(_transformBlock,middleBuffer);
PropagateFailure(_transformBlock,_inputBlock);
}
// Constructor with synchronous lambda
public DependencyTransformBlock(
Func<TInput,TOutput> transform,IEqualityComparer<TKey> keyComparer = null) : this(
input => Task.FromResult(transform(input)),keySelector,dependenciesSelector,dataflowBlockOptions,keyComparer)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
}
public TInput[] Unprocessed
{
get
{
lock (_locker) return _items.Values
.Where(item => item.HasInput && !item.IsCompleted)
.Select(item => item.Input)
.ToArray();
}
}
public Task Completion => _transformBlock.Completion;
public void Complete() => _inputBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
DataflowMessageHeader header,TInput value,ISourceBlock<TInput> source,bool consumeToAccept)
{
return _inputBlock.OfferMessage(header,value,source,consumeToAccept);
}
TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,ITargetBlock<TOutput> target,out bool messageConsumed)
{
return _transformBlock.ConsumeMessage(header,target,out messageConsumed);
}
bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,ITargetBlock<TOutput> target)
{
return _transformBlock.ReserveMessage(header,target);
}
void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,ITargetBlock<TOutput> target)
{
_transformBlock.ReleaseReservation(header,target);
}
public IDisposable LinkTo(ITargetBlock<TOutput> target,DataflowLinkOptions linkOptions)
{
return _transformBlock.LinkTo(target,linkOptions);
}
private async void PropagateCompletion(IDataflowBlock source,IDataflowBlock target,Func<bool> condition = null)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
if (source.Completion.IsFaulted)
target.Fault(source.Completion.Exception.InnerException);
else
if (condition == null || condition()) target.Complete();
}
private async void PropagateFailure(IDataflowBlock source,IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
if (source.Completion.IsFaulted)
target.Fault(source.Completion.Exception.InnerException);
}
}
用法示例:
var block = new DependencyTransformBlock<Item,string,Item>(item =>
{
DoWork(item);
return item;
},keySelector: item => item.Name,dependenciesSelector: item => item.DependsOn,new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
},keyComparer: StringComparer.OrdinalIgnoreCase);
//...
block.LinkTo(DataflowBlock.NullTarget<Item>());
在此示例中,该块链接到NullTarget
以便丢弃其输出,因此它实际上成为与ActionBlock
等效的块。