问题描述
上次,我开始对订单实施bitbay.net订阅。
问题在于,bitbay正在返回订单增量,但我始终希望保留整个价格深度(因此,我必须保留整个价格深度,并在发生某些增量事件时进行更新):
bid ask bid ask
---------- -----------
A D ------------>delta-event(removed=D)---> A F
B F B G
C G C
所以我决定使用
Flux
.from(eventsFromBitbay)
.scan(FullPriceDepth.empty(),(pd,e) -> pd.update(e))
.subscription(...)
我的问题是,就效率和线程安全而言,Flux.scan(...)会是一个不错的选择吗?我说的是高速系统中的数百万个事件。
我的替代方法是制作一些Atomic...
并在Flux.create(...).map(e -> atomicHere)
中进行更新,还是有更好的选择?
Flux.scan()
比Atomic...
更有效率,为什么,为什么不呢?
解决方法
“我的问题是Flux.scan(...)将是一个很好的选择吗?”
当然,为什么不呢?如果你问我,这是一个明显的模式。您有一个包含处理助焊剂所需信息的类。不过,您应该牢记两件事,主要是通量的顺序很容易更改,例如使用Flux::flatMap
而不是Flux::flatMapSequential
,这样您就可以轻松地以任何顺序获取事物。另外,有人可以将流量放在多个线程上,因此您的FullPriceDepth
属性可能必须为并发问题编写代码。