问题描述
给定一系列具有超时趋势的数字,我想使用 Reactive Extensions 来在绝对变化突然出现峰值或下降时发出警报。即 101.2,102.4,101.4,100.9,95,93,85...
然后缓慢增加到 100。
警报将在从 100.9 下降到 95 时触发,每个都有一个时间戳,用于查找以下形式的警报:
大变 时间戳 距离 百分比
我相信我需要从 Buffer(60,1)
开始以获得 60 个样本移动平均值(样本之间的微小频率)。
虽然这会给出平均值,但我不能指定任意百分比来触发警报,因为这可能因信号而异 - 一个可能比另一个具有更大的波动性。
为了获得波动性,我将采用更长的历史时间范围 Buffer(14,1)
(这将是同一信号的 14 天每日平均值)。
然后我会计算缓冲区中的每个值与 14 天平均值之间的差异,平方并添加所有这些偏差,然后除以样本数。
我的问题是:
-
我将如何执行上述波动率计算,或者最好在 RX 之外执行此操作并在可观察流计算之外每天更新一次新的波动率值(这可能更有意义,以避免我有通过它运行 14 天的 1 分钟样本)?
-
我们如何结合快速移动平均线和波动率水平(每天更新一次)来发出警报?我在 SO 上的帖子中看到
Scan
和distinctUntilChanged
,但不知道如何组合。
解决方法
我首先将其分解为多个步骤。 (为简单起见,我假设原始数据源是一个名为 values
的 observable。)
- 将
values
转换为可观察的移动平均线(我们在这里称之为averages
)。 - 将
values
和averages
组合成一个可以观察“极端”的 observable。
对于第 1 步,您可以使用 Slugart 在评论中提到的内置 Window
方法或类似的 Buffer
方法。 Select
或 Window
之后的 Buffer
调用可用于将数组处理为单个平均值对象。类似的东西:
averages = values.Buffer(60,1)
.Select((buffer) => { /* do average and std dev calcuation here */ });
如果您需要滑动窗口,您可能需要实现自己的操作符,但我很容易不知道确实存在一个操作符。 Scan
连同队列似乎是此类运算符的良好基础,如果您需要编写它。
对于第 2 步,您可能希望以 CombineLatest
开头,后跟 Where
子句。类似的东西:
extremes = values.CombineLatest(averages,(v,a) => new { Current = v,Average = a })
.Where((value) = { /* check if value.Current is out of deviation from value.Average */ });
这种方法的好处在于,您可以选择是像我们在这里所做的那样直接根据行中的值计算平均值,或者是其他一些波动率信息来源,而对其余代码的影响最小。
请注意,CombineLatest 调用可能会导致两个对值的订阅,一个是直接订阅,另一个是通过订阅平均值间接订阅。如果值的底层实现不希望出现这种情况,请使用 Publish 和 RefCount 来解决这个问题。
还要注意,CombineLatest 将在每次 values 或 averages 输出一个值时输出一个值。这意味着每次平均值更新时您都会收到两个事件,一个用于值更新,另一个用于由值触发的平均值更新。
如果您使用滑动窗口,这意味着对每个值进行双重更新,最好在 Scan 输出中简单地包含当前值并完全跳过 CombineLatest。你会得到这样的东西:
averages = values.Scan((v) => { /* build sliding window and attach current value */ });
extremes = averages.Where((a) => { /* check if current value is out of deviation for the window */ });
拥有 extremes
后,您可以订阅它并触发您的提醒。