问题描述
我的代码需要有 GroupBy
,并且每组 BehaviorSubject
需要一个唯一的 Switch()
。
我们有一个按 Symbol
分组的股票市场价值流,并在多个级别之间执行水平交叉(由 BehaviorSubject
和一个始终使用最新值的开关定义)。
所以我需要从这里开始:
var Feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0,500.0,600.0,700.0 });
levels
.Select(thresholds => Feed
.Buffer(2,1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0],x[1]),PrevIoUs = x[0],Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.PrevIoUs,x.Current))))
.Switch()
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
并调整上述内容以获取下面的 Tick
流并按 Symbol
分组,每个分组在每个分组的 Value
上都有自己的级别阈值检测。
class Tick
{
public string Symbol { get; set; } // The name.
public decimal Value { get; set; } // The value.
}
大纲:
通过一个简单的实现,我有一个包装类(下面的ReactiveSymbolFeed
),但是模糊非反应性和反应性代码可能会引入潜在的并发问题,否则反应性扩展可以巧妙地处理这些问题。
问题:
我是否引入了任何副作用,或者这会导致大规模问题(比如 2,000 个群组中每秒 100,000 条消息)?
由于我们有许多组,每个组都有自己的 BehaviorSubject 需要 Switch() - 我们可以重写我们的反应式扩展语句块以包含每个符号组的阈值级别,或者上面的包装类是正确的方法吗?
进一步的上下文和包装类解决方案
相反,我创建了一个 ReactiveSymbolFeed
包装器,它将为每个符号键构成字典的值部分。
class ReactiveSymbolFeed
{
readonly BehaviorSubject<double[]> levels;
readonly Subject<double> Feed;
public ReactiveSymbolFeed(double[] levels)
{
this.Feed = new Subject<double>();
this.levels = new BehaviorSubject<double[]>(levels);
this.levels
.Select(thresholds => this.Feed
.Buffer(2,1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0],Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.Current))))
.Switch()
.distinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
}
public void OnNext(double value) => this.Feed.OnNext(value);
public void UpdateThresholds(double[] levels) => this.levels.OnNext(levels);
}
然后与以下一起使用:
// Setup the detection thresholds per Symbol - each Symbol has 1 set of thresholds
var dictionary = new Dictionary<string,ReactiveSymbolFeed>();
dictionary.Add("AAPL",new ReactiveSymbolFeed(new[] { 120.0,125.0,130.0 }));
dictionary.Add("VXX",new ReactiveSymbolFeed(new[] { 10.5,15,18.5,20 }));
// Create some test tick data.
var ticks = new[]
{
new Tick { Symbol = "AAPL",Value = 119.0 },new Tick { Symbol = "VXX",Value = 10.3 },Value = 10.8 },new Tick { Symbol = "AAPL",Value = 121.0 },Value = 121.0 }
// Followed by many other differnet Symbols and Values
};
// Loop through test data and dispatch it.
foreach(var tick in ticks)
{
if(dictionary.TryGetValue(tick.Symbol,out var value))
value.OnNext(tick.Value);
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)