问题描述
我尝试编写控制台可观察对象,如下例所示,但它不起作用。订阅存在一些问题。如何解决这些问题?
static class Program
{
static async Task Main(string[] args)
{
// var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
// var observable = FromConsole().Publish().RefCount(); // doesn't work
var observable = FromConsole(); // doesn't work
observable.Subscribe(Console.WriteLine);
await Task.Delay(1500);
observable.Subscribe(Console.WriteLine);
await new taskcompletionsource().Task;
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
如果我使用 Observable.Interval
,它会订阅两次,并且我对一个输入有两个输出。如果我使用了 FromConsole
的任何版本,我都会有一个订阅和一个被阻止的线程。
解决方法
首先,通常最好避免使用 Observable.Create
来创建 observables - 它肯定是出于这个目的,但它可以创建不像你认为的那样表现的 observables,因为它们的阻塞性质.正如你所发现的!
相反,在可能的情况下,使用内置运算符来创建可观察对象。在这种情况下可以做到这一点。
我的 FromConsole
版本是这样的:
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
Observable.Start
实际上就像 Task.Run
对于 observables。它为我们调用 Console.ReadLine()
而不会阻塞。
Observable.Defer
/Repeat
对重复调用 Observable.Start(() => Console.ReadLine())
。如果没有 Defer
,它只会调用 Observable.Start
并永远重复返回一个字符串。
这就解决了。
现在,第二个问题是您想要查看 Console.ReadLine()
可观察对象订阅的 FromConsole()
输出的值。
由于 Console.ReadLine
的工作方式,您从每个订阅中获取值,但一次只能获取一个。试试这个代码:
static async Task Main(string[] args)
{
var observable = FromConsole();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
当我运行时,我得到这样的输出:
1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf
这样做的原因是每个订阅都会启动对 FromConsole
的新订阅。所以你有两个对 Console.ReadLine()
的调用,它们有效地排队,每个调用只得到每个备用输入。因此 1
和 2
之间的交替。
因此,要解决这个问题,您只需要 .Publish().RefCount()
运算符对。
试试这个:
static async Task Main(string[] args)
{
var observable = FromConsole().Publish().RefCount();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
我现在得到:
1:Hello
2:Hello
1:World
2:World
简而言之,正是非阻塞 FromConsole
observable 和 .Publish().RefCount()
的使用相结合,使这项工作符合您的预期。
问题在于 Some("10-30").flatMap(getValueFromString)
// res: Option[Double] = Some(10.0d)
是一种阻塞方法,因此对 Console.ReadLine
序列的订阅会无限期阻塞,因此永远不会到达 FromConsole
行。您可以通过异步读取控制台来解决此问题,将阻塞调用卸载到 await Task.Delay(1500);
线程:
ThreadPool
您可以查看 this 问题,了解为什么没有比卸载更好的解决方案。
附带说明,订阅序列而不提供 static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(await Task.Run(() => Console.ReadLine()));
}
});
}
处理程序 is not a good idea,除非进程因未处理的异常而崩溃是您的应用可以接受的行为。使用 onError
生成的序列尤其成问题,因为它可能导致像这样的奇怪/错误行为:Async Create hanging while publishing observable。
你需要在没有发布的情况下返回一个 observable。然后你可以订阅它并进一步做你的事情。这是一个例子。当我运行它时,我可以多次阅读。
public class Program
{
static void Main(string[] args)
{
FromConsole().Subscribe(x =>
{
Console.WriteLine(x);
});
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}