问题描述
我正在尝试使用数据流块,并且需要监视通过单元测试的项目。
为此,我在AsObservable()
的{{1}}上使用了ISourceBlock<T>
方法,
这样我就可以在执行后检查管道的每个块都生成了期望值。
管道
TransformBlock<Tinput,T>
MyObserver
{
...
var observer = new MyObserver<string>();
_block = new TransformManyBlock<string,string>(MyHandler,options);
_block.LinkTo(_nextBlock);
_block.AsObservable().Subscribe(observer);
_block.Post("Test");
...
}
因此,基本上,我将观察者订阅了transformblock,并且希望传递的每个值都在观察者的“值”列表中注册。
但是,尽管public class MyObserver<T> : IObserver<T>
{
public List<Exception> Errors = new List<Exception>();
public bool IsComplete = false;
public List<T> Values = new List<T>();
public void OnCompleted()
{
IsComplete = true;
}
public void OnNext(T value)
{
Values.Add(value);
}
public void OnError(Exception e)
{
Errors.Add(e);
}
}
设置为true,并且IsComplete
成功注册了异常,
除非它是管道的最后一块,否则永远不会调用OnError()
方法。
我不知道为什么,因为链接到此sourceBlock的“ nextblock”成功接收到数据,证明某些数据正在退出该块。
据我了解,OnNext()
应该报告退出该块的每个值,而不只是报告尚未由其他链接块使用的值...
我在做什么错了?
解决方法
_nextBlock
正在消耗您的邮件,然后您才有机会阅读它们。
如果您对此行_block.LinkTo(_nextBlock);
进行注释,则可能会起作用。
AsObservable
的唯一目的只是允许从 RX 中消费 块。它不会更改将 broadcast 消息发送到多个 target 的功能块的内部工作。您需要为此BroadcastBlock
我建议广播到另一个 block 并将其用于Subscribe
BroadcastBlock的人生使命是使所有与 获取每个发布元素的副本的块
var options = new DataflowLinkOptions {PropagateCompletion = true};
var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));
broadcastBlock.LinkTo(bufferBlock,options);
broadcastBlock.LinkTo(actionBlock,options);
bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));
for (var i = 0; i < 5; i++)
await broadcastBlock.SendAsync(i.ToString());
broadcastBlock.Complete();
await actionBlock.Completion;
输出
peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4