scala – groupBy的子流是否可以依赖于它们生成的密钥?

我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取.

我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素.

如果我不想合并下游的子流,我可以使用groupBy和Sink.lazyInit做一些事情:

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element,_] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue,getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),??? // this is never called,since the subflow is created when an element comes
  )
)

但是,如果treatUser成为Flow,则这不起作用,因为Sink.lazyInit没有等价物.

由于groupBy的子流仅在推送新元素时才具体化,因此应该可以使用此元素来实现子流,但是我无法调整groupBy的源代码以使此工作始终如一.同样,Sink.lazyInitdo似乎不容易转换为Flow案例.

有关如何解决这个问题的任何想法?

解决方法

您需要关注的相关Akka问题是 #20129: add Sink.dynamic and Flow.dynamic.

在相关的PR #20579中,他们实际上实现了LazySink的东西.

他们计划下一步做LazyFlow:

Will do next lazyFlow with similar signature.

不幸的是,你必须等待在Akka中实现该功能或自己编写(然后考虑公关到Akka).

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...