反应式扩展第一个限制值

问题描述

我想从节流返回第一个值。

在下面的代码中,我希望在大约4秒钟后返回值2

相反,我看到的是值2在大约11秒后返回。直到Events可枚举完成后,整个可观察对象才会完成。为什么FirstAsync不早退?我该如何使其工作?

如果我删除油门线,则只会将事件1写入控制台,因此我认为这与引起不同行为的原因有关。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp11
{
    class Program
    {
        static void Main()
        {
            static IEnumerable<int> Events()
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 1");
                yield return 1;

                Thread.Sleep(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 2");
                yield return 2;

                Thread.Sleep(TimeSpan.FromSeconds(9));
                Console.WriteLine("Event 3");
                yield return 3;
            }

            var stopwatch = new Stopwatch();

            stopwatch.Start();
            var result = Events()
                .ToObservable()
                .Throttle(TimeSpan.FromSeconds(2))
                .Select(i =>
                {
                    Console.WriteLine($"Window complete ({i})");
                    return i;
                })
                .FirstAsync()
                .Wait();
            stopwatch.Stop();

            Console.WriteLine($"Observable complete ({result}): {stopwatch.Elapsed}");
            Console.ReadLine();
        }
    }
}

输出以下内容

Event 1
Event 2
Window complete (2)
Event 3
Observable complete (2): 00:00:11.1100222

解决方法

该生成器阻止了ToObservable中的代码

尝试一下:

using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace ConsoleApp11
{
    class Program
    {
        static async Task Main()
        {
            var events = Observable.Create<int>(async sub =>
            {
                await Task.Delay(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 1");
                sub.OnNext(1);

                await Task.Delay(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 2");
                sub.OnNext(2);

                await Task.Delay(TimeSpan.FromSeconds(9));
                Console.WriteLine("Event 3");
                sub.OnNext(3);

                return Disposable.Empty;
            });

          
            var stopwatch = new Stopwatch();

            stopwatch.Start();
            var result = await events
                .Throttle(TimeSpan.FromSeconds(2))
                .Select(i =>
                {
                    Console.WriteLine($"Window complete ({i})");
                    return i;
                })
                .FirstAsync();
            stopwatch.Stop();

            Console.WriteLine($"Observable complete ({result}): {stopwatch.Elapsed}");
            Console.ReadLine();
        }
    }
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...