使用 Rx,如何获取每个唯一键的最新值?

问题描述

如何获取每个唯一键的最新值?

在这个示例中,我有股票代码和价格。我如何组合它才能输出具有最新价格的独特股票?

在现实生活中,这一系列事件会持续数年,而我只需要股票的当前价格。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace LearnRx
{
    class Program
    {
        public class Stock
        {
            public string Symbol { get; set; }
            public float Price { get; set; }
            public DateTime ModifiedAt { get; set; }

            public override string ToString()
            {
                return "Stock: " + Symbol + " " + Price + " " + ModifiedAt;
            }
        }

        static void Main(string[] args)
        {
            var baseTime = new DateTime(2000,1,1);
            var fiveMinutes = TimeSpan.FromMinutes(5);
            var stockEvents = new Stock[]
            {
                new Stock
                {
                    Symbol = "MSFT",Price = 100,ModifiedAt = baseTime
                },new Stock
                {
                    Symbol = "GOOG",Price = 200,ModifiedAt = baseTime + fiveMinutes
                },new Stock
                {
                    Symbol = "MSFT",Price = 150,ModifiedAt = baseTime + fiveMinutes + fiveMinutes
                },new Stock
                {
                    Symbol = "AAPL",Price = 300,ModifiedAt = baseTime + fiveMinutes + fiveMinutes + fiveMinutes
                },}.ToObservable();

            var scheduler = new HistoricalScheduler();
            var replay = Observable.Generate(
                stockEvents.GetEnumerator(),events => events.MoveNext(),events => events,events => events.Current,events => events.Current.ModifiedAt,scheduler);


            replay
                .Subscribe( i => Console.WriteLine($"Event: {i} happened at {scheduler.Now}"));

            scheduler.Start();
        }
    }
}

解决方法

听起来您想要一个 GroupBy 后跟一个应该存在但不存在的 CombineLatest

这是缺失的 CombineLatest

public static class X
{
    
    public static IObservable<ImmutableDictionary<TKey,TValue>> CombineLatest<TKey,TValue>(this IObservable<IGroupedObservable<TKey,TValue>> source)
    {
        return source
            .SelectMany(o => o.Materialize().Select(n => (Notification: n,Key: o.Key)))
            .Scan(default(Notification<ImmutableDictionary<TKey,TValue>>),(stateNotification,t) => {
                var state = stateNotification?.Value ?? ImmutableDictionary<TKey,TValue>.Empty;
                switch(t.Notification.Kind) {
                    case NotificationKind.OnError:
                        return Notification.CreateOnError<ImmutableDictionary<TKey,TValue>>(t.Notification.Exception);
                    case NotificationKind.OnCompleted:
                        return Notification.CreateOnNext(state.Remove(t.Key));
                    case NotificationKind.OnNext:
                        return Notification.CreateOnNext(state.SetItem(t.Key,t.Notification.Value));
                    default:
                        throw new NotImplementedException();
                }
            })
            .Dematerialize();
    }
}

然后是 Main 中的最终查询:

var prices = replay
    .GroupBy(s => s.Symbol)
    .CombineLatest()

这将为您提供每个价格点的 Observable<Dictionary<string,Stock>>。这应该使您能够在下游执行任何操作。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...