问题描述
如何获取每个唯一键的最新值?
在这个示例中,我有股票代码和价格。我如何组合它才能输出具有最新价格的独特股票?
在现实生活中,这一系列事件会持续数年,而我只需要股票的当前价格。
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace LearnRx
{
class Program
{
public class Stock
{
public string Symbol { get; set; }
public float Price { get; set; }
public DateTime ModifiedAt { get; set; }
public override string ToString()
{
return "Stock: " + Symbol + " " + Price + " " + ModifiedAt;
}
}
static void Main(string[] args)
{
var baseTime = new DateTime(2000,1,1);
var fiveMinutes = TimeSpan.FromMinutes(5);
var stockEvents = new Stock[]
{
new Stock
{
Symbol = "MSFT",Price = 100,ModifiedAt = baseTime
},new Stock
{
Symbol = "GOOG",Price = 200,ModifiedAt = baseTime + fiveMinutes
},new Stock
{
Symbol = "MSFT",Price = 150,ModifiedAt = baseTime + fiveMinutes + fiveMinutes
},new Stock
{
Symbol = "AAPL",Price = 300,ModifiedAt = baseTime + fiveMinutes + fiveMinutes + fiveMinutes
},}.ToObservable();
var scheduler = new HistoricalScheduler();
var replay = Observable.Generate(
stockEvents.GetEnumerator(),events => events.MoveNext(),events => events,events => events.Current,events => events.Current.ModifiedAt,scheduler);
replay
.Subscribe( i => Console.WriteLine($"Event: {i} happened at {scheduler.Now}"));
scheduler.Start();
}
}
}
解决方法
听起来您想要一个 GroupBy
后跟一个应该存在但不存在的 CombineLatest
。
这是缺失的 CombineLatest
:
public static class X
{
public static IObservable<ImmutableDictionary<TKey,TValue>> CombineLatest<TKey,TValue>(this IObservable<IGroupedObservable<TKey,TValue>> source)
{
return source
.SelectMany(o => o.Materialize().Select(n => (Notification: n,Key: o.Key)))
.Scan(default(Notification<ImmutableDictionary<TKey,TValue>>),(stateNotification,t) => {
var state = stateNotification?.Value ?? ImmutableDictionary<TKey,TValue>.Empty;
switch(t.Notification.Kind) {
case NotificationKind.OnError:
return Notification.CreateOnError<ImmutableDictionary<TKey,TValue>>(t.Notification.Exception);
case NotificationKind.OnCompleted:
return Notification.CreateOnNext(state.Remove(t.Key));
case NotificationKind.OnNext:
return Notification.CreateOnNext(state.SetItem(t.Key,t.Notification.Value));
default:
throw new NotImplementedException();
}
})
.Dematerialize();
}
}
然后是 Main
中的最终查询:
var prices = replay
.GroupBy(s => s.Symbol)
.CombineLatest()
这将为您提供每个价格点的 Observable<Dictionary<string,Stock>>
。这应该使您能够在下游执行任何操作。