使用Akka流一对多合并

问题描述

我有一个用例,其中有一个要从数据库获取的值的列表和一个需要获取其值的日期的列表。我想使用akka流(带有GraphDSL的流或源)在它们之间建立一对多(或多对一)关系,以便为每个日期获取每个值

例如,

动物=牛,山羊,绵羊

years = 2018,2019

预期的流输出

牛&2018

山羊&2018

绵羊&2018

牛&2019

山羊&2019

绵羊&2019

解决方法

如果您想要这样的产品,则不需要Graph DSL。

def animalsAndYears(animals: Source[Animal,NotUsed],years: Source[Year,NotUsed]): Source[(Animal,Year),NotUsed] =
  years.flatMapConcat { year =>
    animals.map { animal =>
      animal -> year
    }
  }

所以:

 animalsAndYears(Source(listOfAnimals),Source(listOfYears))

将为您提供animalyear元组的流。假设您有一个函数:

 def queryDBForAnimalYear(aandy: (Animal,Year)): Future[Seq[Row]] = ???

然后,您可以使用以下命令获取行流:

val parallelism: Int = ??? // How many queries to have in-flight at a time
animalsAndYears(Source(listOfAnimals),Source(listOfYears))
  .mapAsync(parallelism) { params => queryDBForAnimalYear(params) }
  .mapConcat(identity)  // gives you a Source[Row]