使用折叠的Akka流未完成

问题描述

我有一些大致如下的代码:其中A是两个Map的元组

def methodName(): Flow[A,B,NotUsed] = {
  val filter = Flow[A].map(a => a._2.slice(0,2))
  val split = Flow[A._2]
    .mapConcat(identity)
    .map(t => {
      B.random
    })
    .fold(B.empty)((a,b) => {
      new B(a._1,a._2 ++ Seq(b._1),a._3 ++ Seq(b._2),a._4)
    })

  val logK = Flow[B].log("K",c => {
    log.info("here")
  })
  filter.via(split).via(logK)
}

但是当我运行它时,流在折叠阶段停滞了,我不明白为什么。我可以确认A._2中的收藏已用尽,并且当我用其他操作替换折页时,流程继续进行并且没有被阻塞。据我所知,completeStage是由上游mapConcat调用的。因此,我不确定折叠阶段为何没有接到该电话并知道继续进行下一阶段。

解决方法

因此,这似乎是我使用的akka​​版本中的错误:akka:“ 2.5.23”,akkaHttp:“ 10.1.10”

当我升级到akka时:“ 2.6.8”和akkaHttpV =“ 10.2.0” 一切都按预期工作