问题描述
这是我尝试使用响应式编程(使用 RxPy)解决的问题:
我有一个 MIDI 事件流(现场钢琴演奏),每个事件对应一个按下或释放的键。对于给定的音符,第一个事件是“按下”的,而后面的事件始终是“释放”的。当然,可以在释放另一个键之前按下任何键。
我想转换这个事件流,只保留“发布”事件以及“按下”和“释放”之间经过的时间。
因此,我发现 RX 是实现这一目标的完美候选者,我的目标是:
为了简单起见,我没有提到“press”和“release”属性的交替,也没有提到应该添加到结果流中的持续时间信息。
我的第一个赌注是使用 group_by()
运算符,结果是:
到目前为止,我的代码是这样的:
import rx
from rx.operators import filter,group_by
def is_note_message(event):
return event['type']==9 // MIDI "Note On" event
def is_note_pressed(event):
return event['veLocity']>0
def is_note_released(event):
return event['veLocity']==0
(...)
note_obs = midi_events.pipe(
filter( is_note_message ),group_by( lambda evt: evt['note'])
)
问题是我只想对给定笔记的 2 个事件(“新闻”和“发布”)进行分组,因此我想通过使用 group_by_until()
来更进一步。请注意,我在这里做了两个假设,这可能不正确:
- 我认为这样做会使以后处理持续时间计算的过程更容易。
- 我认为
group_by_until()
能够根据某些事件属性“停止”分组。
这就是我被卡住的地方:
- 如果
group_by_until()
为真,我找不到is_note_released()
停止分组的正确语法 - 我不确定如何处理每个分组事件以计算持续时间,然后合并(使用
flat_map()
?)它们(但这可能是另一个问题)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)