DataflowBlock ITargetSource.AsObservable不触发OnNext 管道 MyObserver

问题描述

我正在尝试使用数据流块,并且需要监视通过单元测试的项目。

为此,我在AsObservable()的{​​{1}}上使用了ISourceBlock<T>方法, 这样我就可以在执行后检查管道的每个块都生成了期望值。

管道

TransformBlock<Tinput,T>

MyObserver

{
   ...
   var observer = new MyObserver<string>();
   _block  = new TransformManyBlock<string,string>(MyHandler,options);
   _block.LinkTo(_nextBlock);
   _block.AsObservable().Subscribe(observer);
   _block.Post("Test");
   ...
}

因此,基本上,我将观察者订阅了transformblock,并且希望传递的每个值都在观察者的“值”列表中注册

但是,尽管public class MyObserver<T> : IObserver<T> { public List<Exception> Errors = new List<Exception>(); public bool IsComplete = false; public List<T> Values = new List<T>(); public void OnCompleted() { IsComplete = true; } public void OnNext(T value) { Values.Add(value); } public void OnError(Exception e) { Errors.Add(e); } } 设置为true,并且IsComplete成功注册了异常, 除非它是管道的最后一块,否则永远不会调用OnError()方法。 我不知道为什么,因为链接到此sourceBlock的“ nextblock”成功接收到数据,证明某些数据正在退出该块。

据我了解,OnNext()应该报告退出该块的每个值,而不只是报告尚未由其他链接块使用的值...

我在做什么错了?

解决方法

_nextBlock正在消耗您的邮件,然后您才有机会阅读它们。

如果您对此行_block.LinkTo(_nextBlock);进行注释,则可能会起作用。

AsObservable的唯一目的只是允许从 RX 中消费 块。它不会更改将 broadcast 消息发送到多个 target 的功能块的内部工作。您需要为此BroadcastBlock

设置一个特殊的块

我建议广播到另一个 block 并将其用于Subscribe

BroadcastBlock的人生使命是使所有与 获取每个发布元素的副本的块

var options = new DataflowLinkOptions {PropagateCompletion = true};


var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));

broadcastBlock.LinkTo(bufferBlock,options);
broadcastBlock.LinkTo(actionBlock,options);

bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));

for (var i = 0; i < 5; i++)
   await broadcastBlock.SendAsync(i.ToString());

broadcastBlock.Complete();
await actionBlock.Completion;

输出

peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4