斯卡拉 – 阿卡流.分组,聚合一段时间并发出结果

我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西:

Source
    .tick(0 second,50 millis,() => if (Random.nextBoolean) (1,s"A") else (2,s"B"))
    .map { f => f() }
    .groupBy(10,_._1)
    // how to aggregate grouped elements here for two seconds?
    .scan(Seq[String]()) { (x,y) => x ++ Seq(y._2) }
    .to(Sink.foreach(println))

期望的输出应该如下所示:

Seq(A,A,A)
Seq(B,B,B)
Seq(A,B)
// and so on

如何使用流实现此类功能

解决方法

相关文章

迭代器模式(Iterator)迭代器模式(Iterator)[Cursor]意图...
高性能IO模型浅析服务器端编程经常需要构造高性能的IO模型,...
策略模式(Strategy)策略模式(Strategy)[Policy]意图:定...
访问者模式(Visitor)访问者模式(Visitor)意图:表示一个...
命令模式(Command)命令模式(Command)[Action/Transactio...
生成器模式(Builder)生成器模式(Builder)意图:将一个对...