我把Flow打算重新用作函数,所以我想保留它的签名:
流程[输入,输出,未使用]
现在,当我使用此流程时,我希望能够“调用”此流程并将结果保留在一边以便进一步处理.
所以我想从Flow发射[输入]开始,应用我的流程,然后继续流发射[(输入,输出)].
例:
val s: Source[Int,NotUsed] = Source(1 to 10) val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString) val via: Source[(Int,String),NotUsed] = ???
现在这不可能以一种简单的方式进行,因为将流与.via()结合起来会让我只发出流量[输出]
val via: Source[String,NotUsed] = s.via(stringIfEven)
另一种方法是使我的可重用流发出[(输入,输出)],但这需要每个流将其输入推送到所有阶段并使我的代码看起来很糟糕.
所以我想出了一个像这样的组合器:
def tupledFlow[In,Out](flow: Flow[In,Out,_]):Flow[In,(In,Out),NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val broadcast = b.add(Broadcast[In](2)) val zip = b.add(Zip[In,Out]) broadcast.out(0) ~> zip.in0 broadcast.out(1) ~> flow ~> zip.in1 FlowShape(broadcast.in,zip.out) })
}
即将输入广播到流并且直接在平行线中广播 – >两者都在’Zip’阶段,我将值加入元组.然后它可以优雅地应用:
val tupled: Source[(Int,NotUsed] = s.via(tupledFlow(stringIfEven))
一切都很棒但是当给定流量时正在进行“过滤”操作 – 这个组合器被卡住并停止处理更多事件.
我想这是由于’Zip’行为需要所有子流做同样的事情 – 在我的情况下,一个分支直接传递给定对象,所以另一个子流不能忽略这个元素. filter(),因为它确实 – 流停止,因为Zip正在等待推送.
有没有更好的方法来实现流量组成?
当’flow’忽略带’filter’的元素时,我可以在tupledFlow中做些什么来获得所需的行为吗?
解决方法
1)避免使用过滤阶段,将过滤器变为Flow [Int,Option [Int],NotUsed].这样您就可以像原始计划一样在整个图表周围应用压缩包装器.但是,代码看起来更加污染,并且通过传递Nones会增加额外开销.
val stringIfEvenOrNone = Flow[Int].map{ case x if x % 2 == 0 => Some(x.toString) case _ => None } val tupled: Source[(Int,NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{ case (num,Some(str)) => (num,str) }
2)分离过滤和转换阶段,并在压缩包装之前应用过滤阶段.可能是一个更轻量级和更好的妥协.
val filterEven = Flow[Int].filter(_ % 2 == 0) val toString = Flow[Int].map(_.toString) val tupled: Source[(Int,NotUsed] = s.via(filterEven).via(tupledFlow(toString))
编辑
3)根据评论中的讨论,为了清楚起见,在此处发布另一个解决方案.
此流包装器允许从给定流发出每个元素,与生成它的原始输入元素配对.它适用于任何类型的内部流程(每个输入发出0,1个或更多元素).
def tupledFlow[In,_]): Flow[In,NotUsed] = Flow[In].flatMapConcat(in => Source.single(in).via(flow).map( out => in -> out))