scala – 如何对来自无限流的传入事件进行分组?

我有无数的事件:

(timestamp,session_uid,traffic)

...
(1448089943,session-1,10)
(1448089944,20)
(1448089945,session-2,50)
(1448089946,30)
(1448089947,10)
(1448089948,session-3,10)
...

这些事件我想按session_uid分组,并计算每个会话的流量总和.

我写了一个akka-streams流程,它可以很好地使用有限流使用groupBy(我的代码基于this示例来自cookbook).但是对于无限流,它将无法工作,因为groupBy函数应该处理所有传入流,并且只有在此之后才准备好返回结果.

我想我应该用超时实现分组,即如果我从最后一次超过5分钟没有收到指定stream_uid的事件,我应该返回此session_uid的分组事件.但是如何实现它只使用akka-streams?

解决方法

我提出了一个有点 gnarly解决方案,但我认为它完成了工作.

关键的想法是使用Source的keepAlive方法作为将触发完成的计时器.

要做到这一点,我们首先必须抽象一些数据.计时器需要从原始Source发送触发器或另一个元组值,因此:

sealed trait Data

object TimerTrigger extends Data
case class Value(tstamp : Long,session_uid : String,traffic : Int) extends Data

然后将我们的元组源转换为值的来源.我们仍然会使用groupBy进行类似于有限流情况的分组:

val originalSource : Source[(Long,String,Int),Unit] = ???

type IDGroup = (String,Source[Value,Unit]) //uid -> Source of Values for uid

val groupedDataSource : Source[IDGroup,Unit] = 
  originalSource.map(t => Value(t._1,t._2,t._3))
                .groupBy(_.session_uid)

棘手的部分是处理只是元组的分组:(String,Source [Value,Unit]).我们需要计时器通知我们时间是否已经过去所以我们需要另一个抽象来知道我们是否仍在计算或者由于超时我们已完成计算:

sealed trait Sum {
  val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum

val zeroSum : Sum = StillComputing(0)

现在我们可以消耗每个组的来源.如果值源在timeOut之后没有产生某些内容,keepAlive将发送TimerTrigger.然后,来自keepAlive的数据与TimerTrigger或来自原始Source的新值进行模式匹配:

val evaluateSum : ((Sum,Data)) => Sum = {
  case (runningSum,data) => { 
    data match {
      case TimerTrigger => ComputedSum(runningSum.sum)
      case v : Value    => StillComputing(runningSum.sum + v.traffic)
    }
  }
}//end val evaluateSum

type SumResult = (String,Future[Int]) // uid -> Future of traffic sum for uid

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = 
  idGroup._1 -> idGroup._2.keepAlive(timeOut,() => TimerTrigger)
                          .scan(zeroSum)(evaluateSum)
                          .collect {case c : ComputedSum => c.sum}
                          .runWith(Sink.head)

该集合应用于仅与完成的总和匹配的部分函数,​​因此只有在计时器触发后才会到达Sink.

然后,我们将此处理程序应用于每个出现的分组:

val timeOut = FiniteDuration(5,MINUTES)

val sumSource : Source[SumResult,Unit] = 
  groupedDataSource map handleGroup(timeOut)

我们现在有一个Source(String,Future [Int]),它是session_uid,是该id的流量总和的Future.

就像我说的那样,令人费解,但符合要求.此外,我不完全确定如果已经分组并且已经超时的uid会发生什么,但随后会出现具有相同uid的新值.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...