(timestamp,session_uid,traffic)
即
... (1448089943,session-1,10) (1448089944,20) (1448089945,session-2,50) (1448089946,30) (1448089947,10) (1448089948,session-3,10) ...
这些事件我想按session_uid分组,并计算每个会话的流量总和.
我写了一个akka-streams流程,它可以很好地使用有限流使用groupBy(我的代码基于this示例来自cookbook).但是对于无限流,它将无法工作,因为groupBy函数应该处理所有传入流,并且只有在此之后才准备好返回结果.
我想我应该用超时实现分组,即如果我从最后一次超过5分钟没有收到指定stream_uid的事件,我应该返回此session_uid的分组事件.但是如何实现它只使用akka-streams?
解决方法
关键的想法是使用Source的keepAlive方法作为将触发完成的计时器.
但要做到这一点,我们首先必须抽象一些数据.计时器需要从原始Source发送触发器或另一个元组值,因此:
sealed trait Data object TimerTrigger extends Data case class Value(tstamp : Long,session_uid : String,traffic : Int) extends Data
然后将我们的元组源转换为值的来源.我们仍然会使用groupBy进行类似于有限流情况的分组:
val originalSource : Source[(Long,String,Int),Unit] = ??? type IDGroup = (String,Source[Value,Unit]) //uid -> Source of Values for uid val groupedDataSource : Source[IDGroup,Unit] = originalSource.map(t => Value(t._1,t._2,t._3)) .groupBy(_.session_uid)
棘手的部分是处理只是元组的分组:(String,Source [Value,Unit]).我们需要计时器通知我们时间是否已经过去所以我们需要另一个抽象来知道我们是否仍在计算或者由于超时我们已完成计算:
sealed trait Sum { val sum : Int } case class StillComputing(val sum : Int) extends Sum case class ComputedSum(val sum : Int) extends Sum val zeroSum : Sum = StillComputing(0)
现在我们可以消耗每个组的来源.如果值源在timeOut之后没有产生某些内容,keepAlive将发送TimerTrigger.然后,来自keepAlive的数据与TimerTrigger或来自原始Source的新值进行模式匹配:
val evaluateSum : ((Sum,Data)) => Sum = { case (runningSum,data) => { data match { case TimerTrigger => ComputedSum(runningSum.sum) case v : Value => StillComputing(runningSum.sum + v.traffic) } } }//end val evaluateSum type SumResult = (String,Future[Int]) // uid -> Future of traffic sum for uid def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = idGroup._1 -> idGroup._2.keepAlive(timeOut,() => TimerTrigger) .scan(zeroSum)(evaluateSum) .collect {case c : ComputedSum => c.sum} .runWith(Sink.head)
该集合应用于仅与完成的总和匹配的部分函数,因此只有在计时器触发后才会到达Sink.
然后,我们将此处理程序应用于每个出现的分组:
val timeOut = FiniteDuration(5,MINUTES) val sumSource : Source[SumResult,Unit] = groupedDataSource map handleGroup(timeOut)
我们现在有一个Source(String,Future [Int]),它是session_uid,是该id的流量总和的Future.
就像我说的那样,令人费解,但符合要求.此外,我不完全确定如果已经分组并且已经超时的uid会发生什么,但随后会出现具有相同uid的新值.