我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取.
我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素.
如果我不想合并下游的子流,我可以使用groupBy和Sink.lazyInit做一些事情:
def getState(userId: UserId): Future[UserState] = ... def getUserId(element: Element): UserId = ... def treatUser(state: UserState): Sink[Element,_] = ... val treatByUser: Sink[Element] = Flow[Element].groupBy( Int.MaxValue,getUserId ).to( Sink.lazyInit( elt => getState(getUserId(elt)).map(treatUser),??? // this is never called,since the subflow is created when an element comes ) )
但是,如果treatUser成为Flow,则这不起作用,因为Sink.lazyInit没有等价物.
由于groupBy的子流仅在推送新元素时才具体化,因此应该可以使用此元素来实现子流,但是我无法调整groupBy的源代码以使此工作始终如一.同样,Sink.lazyInitdo似乎不容易转换为Flow案例.
有关如何解决这个问题的任何想法?
解决方法
您需要关注的相关Akka问题是
#20129: add Sink.dynamic and Flow.dynamic.
在相关的PR #20579中,他们实际上实现了LazySink的东西.
他们计划下一步做LazyFlow:
Will do next lazyFlow with similar signature.
不幸的是,你必须等待在Akka中实现该功能或自己编写(然后考虑公关到Akka).