问题描述
我有一个用例,其中有一个要从数据库中获取的值的列表和一个需要获取其值的日期的列表。我想使用akka流(带有GraphDSL的流或源)在它们之间建立一对多(或多对一)关系,以便为每个日期获取每个值
例如,
动物=牛,山羊,绵羊
years = 2018,2019
预期的流输出为
牛&2018
山羊&2018
绵羊&2018
牛&2019
山羊&2019
绵羊&2019
解决方法
如果您想要这样的产品,则不需要Graph DSL。
def animalsAndYears(animals: Source[Animal,NotUsed],years: Source[Year,NotUsed]): Source[(Animal,Year),NotUsed] =
years.flatMapConcat { year =>
animals.map { animal =>
animal -> year
}
}
所以:
animalsAndYears(Source(listOfAnimals),Source(listOfYears))
将为您提供animal
,year
元组的流。假设您有一个函数:
def queryDBForAnimalYear(aandy: (Animal,Year)): Future[Seq[Row]] = ???
然后,您可以使用以下命令获取行流:
val parallelism: Int = ??? // How many queries to have in-flight at a time
animalsAndYears(Source(listOfAnimals),Source(listOfYears))
.mapAsync(parallelism) { params => queryDBForAnimalYear(params) }
.mapConcat(identity) // gives you a Source[Row]