如何应对akka流中的异常上游故障

问题描述

我的akka​​ Flow[I,O]不受控制,因为它来自某些第三方代码。每当输入元素不产生输出元素时,我就需要做出反应(例如,由于流程的某些部分引发了异常)。为此,我需要产生故障的输入元素。我在流上没有找到任何API或类似的东西,它允许我注册处理程序或以任何方式对其作出反应。我该怎么办?

解决方法

当akka流引发异常时,您想Resume而不是Stop。收集所有成功的元素之后,您可以Seq#diff来说明由于抛出异常而删除了哪些元素。

import scala.concurrent.ExecutionContext
import scala.util.{Failure,Success}

object Exception {

  case class MyException(n: Int) extends RuntimeException

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("Exception")
    implicit val ec: ExecutionContext = system.dispatcher

    val decider: Supervision.Decider = {
      case _: MyException => Supervision.Resume
      case _                      => Supervision.Stop
    }
    val flow = Flow[Int]
      .map(n =>
        if (n % 2 == 1) throw MyException(n)
        else n
      )
    val in = 1 to 10
    val outFuture = Source(in)
      .via(flow)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
    outFuture.onComplete {
      case Success(out) =>
        println("dropped elements are " + (in.diff(out)))
      case Failure(_) =>
        println("unknown failure")
    }
  }
}

控制台输出为:

dropped elements are Vector(1,3,5,7,9)

参考:How to get object that caused failure in Akka Streams?