如何使用 RxPy group_by_until() 运算符?

问题描述

这是我尝试使用响应式编程(使用 RxPy)解决的问题:

我有一个 MIDI 事件流(现场钢琴演奏),每个事件对应一个按下或释放的键。对于给定的音符,第一个事件是“按下”的,而后面的事件始终是“释放”的。当然,可以在释放另一个键之前按下任何键。

我想转换这个事件流,只保留“发布”事件以及“按下”和“释放”之间经过的时间。

因此,我发现 RX 是实现这一目标的完美候选者,我的目标是:

enter image description here

为了简单起见,我没有提到“press”和“release”属性的交替,也没有提到应该添加到结果流中的持续时间信息。

我的第一个赌注是使用 group_by() 运算符,结果是:

enter image description here

到目前为止,我的代码是这样的:

import rx
from rx.operators import filter,group_by

def is_note_message(event):
    return event['type']==9  // MIDI "Note On" event

def is_note_pressed(event):
    return event['veLocity']>0

def is_note_released(event):
    return event['veLocity']==0

(...)

note_obs = midi_events.pipe(
    filter( is_note_message ),group_by( lambda evt: evt['note'])
)

问题是我只想对给定笔记的 2 个事件(“新闻”和“发布”)进行分组,因此我想通过使用 group_by_until() 来更进一步。请注意,我在这里做了两个假设,这可能不正确:

  • 我认为这样做会使以后处理持续时间计算的过程更容易。
  • 我认为 group_by_until() 能够根据某些事件属性“停止”分组。

enter image description here

这就是我被卡住的地方:

  • 如果 group_by_until() 为真,我找不到 is_note_released() 停止分组的正确语法
  • 我不确定如何处理每个分组事件以计算持续时间,然后合并(使用 flat_map() ?)它们(但这可能是另一个问题)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)