每组的 BehaviorSubject 与 GroupBy 和 Switch()

问题描述

我的代码需要有 GroupBy,并且每组 BehaviorSubject 需要一个唯一的 Switch()

我们有一个Symbol 分组的股票市场价值流,并在多个级别之间执行水平交叉(由 BehaviorSubject一个始终使用最新值的开关定义)。

所以我需要从这里开始:

var Feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0,500.0,600.0,700.0 });

levels
    .Select(thresholds => Feed
        .Buffer(2,1)
        .Where(x => x.Count == 2)
        .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0],x[1]),PrevIoUs = x[0],Current = x[1] })
        .Where(x => x.LevelsCrossed.Any())
        .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.PrevIoUs,x.Current))))
    .Switch()
    .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

并调整上述内容获取下面的 Tick 流并按 Symbol 分组,每个分组在每个分组的 Value 上都有自己的级别阈值检测。

class Tick
{
    public string Symbol { get; set; } // The name.
    public decimal Value { get; set; } // The value.
}

大纲:

  1. 获取市场数据
  2. 按符号分组
  3. 警报级别(取决于组名,使用 BehaviorSubject 字典)
  4. 输出
  5. 使用 Switch() 始终使用字典中的最新值

通过一个简单的实现,我有一个包装类(下面的ReactiveSymbolFeed),但是模糊非反应性和反应性代码可能会引入潜在的并发问题,否则反应性扩展可以巧妙地处理这些问题。

问题:

我是否引入了任何副作用,或者这会导致大规模问题(比如 2,000 个群组中每秒 100,000 条消息)?

由于我们有许多组,每个组都有自己的 BehaviorSubject 需要 Switch() - 我们可以重写我们的反应式扩展语句块以包含每个符号组的阈值级别,或者上面的包装类是正确的方法吗?

进一步的上下文和包装类解决方

相反,我创建了一个 ReactiveSymbolFeed 包装器,它将为每个符号键构成字典的值部分。

class ReactiveSymbolFeed
{
    readonly BehaviorSubject<double[]> levels;
    readonly Subject<double> Feed;

    public ReactiveSymbolFeed(double[] levels)
    {
        this.Feed = new Subject<double>();
        this.levels = new BehaviorSubject<double[]>(levels);

        this.levels
            .Select(thresholds => this.Feed
            .Buffer(2,1)
            .Where(x => x.Count == 2)
            .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0],Current = x[1] })
            .Where(x => x.LevelsCrossed.Any())
            .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.Current))))
         .Switch()
         .distinctUntilChanged(x => x.Threshold)
         .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
    }

    public void OnNext(double value) => this.Feed.OnNext(value);

    public void UpdateThresholds(double[] levels) => this.levels.OnNext(levels);
}

然后与以下一起使用:

// Setup the detection thresholds per Symbol - each Symbol has 1 set of thresholds
var dictionary = new Dictionary<string,ReactiveSymbolFeed>();            
dictionary.Add("AAPL",new ReactiveSymbolFeed(new[] { 120.0,125.0,130.0 }));
dictionary.Add("VXX",new ReactiveSymbolFeed(new[] { 10.5,15,18.5,20 }));

// Create some test tick data.
var ticks = new[]
{
    new Tick { Symbol = "AAPL",Value = 119.0 },new Tick { Symbol = "VXX",Value = 10.3 },Value = 10.8 },new Tick { Symbol = "AAPL",Value = 121.0 },Value = 121.0 }
    // Followed by many other differnet Symbols and Values
};

// Loop through test data and dispatch it.
foreach(var tick in ticks)
{
    if(dictionary.TryGetValue(tick.Symbol,out var value))
        value.OnNext(tick.Value);
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...