如何从控制台输入制作 IObservable<string>

问题描述

我尝试编写控制台可观察对象,如下例所示,但它不起作用。订阅存在一些问题。如何解决这些问题?

static class Program
{
    static async Task Main(string[] args)
    {
        // var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
        // var observable = FromConsole().Publish().RefCount(); // doesn't work
        var observable = FromConsole(); // doesn't work
        observable.Subscribe(Console.WriteLine);
        await Task.Delay(1500);
        observable.Subscribe(Console.WriteLine);
        await new taskcompletionsource().Task;
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}

如果我使用 Observable.Interval,它会订阅两次,并且我对一个输入有两个输出。如果我使用了 FromConsole 的任何版本,我都会有一个订阅一个被阻止的线程。

解决方法

首先,通常最好避免使用 Observable.Create 来创建 observables - 它肯定是出于这个目的,但它可以创建不像你认为的那样表现的 observables,因为它们的阻塞性质.正如你所发现的!

相反,在可能的情况下,使用内置运算符来创建可观察对象。在这种情况下可以做到这一点。

我的 FromConsole 版本是这样的:

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();

Observable.Start 实际上就像 Task.Run 对于 observables。它为我们调用 Console.ReadLine() 而不会阻塞。

Observable.Defer/Repeat 对重复调用 Observable.Start(() => Console.ReadLine())。如果没有 Defer,它只会调用 Observable.Start 并永远重复返回一个字符串。

这就解决了。

现在,第二个问题是您想要查看 Console.ReadLine() 可观察对象订阅的 FromConsole() 输出的值。

由于 Console.ReadLine 的工作方式,您从每个订阅中获取值,但一次只能获取一个。试试这个代码:

static async Task Main(string[] args)
{
    var observable = FromConsole();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

当我运行时,我得到这样的输出:

1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf

这样做的原因是每个订阅都会启动对 FromConsole 的新订阅。所以你有两个对 Console.ReadLine() 的调用,它们有效地排队,每个调用只得到每个备用输入。因此 12 之间的交替。

因此,要解决这个问题,您只需要 .Publish().RefCount() 运算符对。

试试这个:

static async Task Main(string[] args)
{
    var observable = FromConsole().Publish().RefCount();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

我现在得到:

1:Hello
2:Hello
1:World
2:World

简而言之,正是非阻塞 FromConsole observable 和 .Publish().RefCount() 的使用相结合,使这项工作符合您的预期。

,

问题在于 Some("10-30").flatMap(getValueFromString) // res: Option[Double] = Some(10.0d) 是一种阻塞方法,因此对 Console.ReadLine 序列的订阅会无限期阻塞,因此永远不会到达 FromConsole 行。您可以通过异步读取控制台来解决此问题,将阻塞调用卸载到 await Task.Delay(1500); 线程:

ThreadPool

您可以查看 this 问题,了解为什么没有比卸载更好的解决方案。

附带说明,订阅序列而不提供 static IObservable<string> FromConsole() { return Observable.Create<string>(async observer => { while (true) { observer.OnNext(await Task.Run(() => Console.ReadLine())); } }); } 处理程序 is not a good idea,除非进程因未处理的异常而崩溃是您的应用可以接受的行为。使用 onError 生成的序列尤其成问题,因为它可能导致像这样的奇怪/错误行为:Async Create hanging while publishing observable

,

你需要在没有发布的情况下返回一个 observable。然后你可以订阅它并进一步做你的事情。这是一个例子。当我运行它时,我可以多次阅读。

public class Program
{

    static void Main(string[] args)
    {
        FromConsole().Subscribe(x =>
        {
            Console.WriteLine(x);
        });
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...