使用Akka流从服务器下载二进制数据

问题描述

在使用流时,我开始将下载处理抽象为RunnableGraph

在此Graph中,我使用一个URL来开始下载过程,我从URL中请求一个HttpResponse并评估StatusCode。如果有一个Redirect响应,我的流程会将Location标头值提供给requestProcessor并尝试再次下载。 如果收到200,则我的进程会累积二进制并将其发送到接收器。

我的问题:here是我的外推语句,该接收器将永远不会收到任何数据:(

我希望我的自定义FlowShape写得正确,但可能存在隐藏的问题。

解决方法

getAsyncCallback适用于我的问题,jrudolph在评论中如何描述。

将代码更改为:

def onGrab(current: HttpResponse): Unit = {
   if (redirectCodes.contains(current.status) && current.header[Location].isDefined) {
     push[String](redirect,current.header[Location].get.value())
     current.entity.discardBytes()
     pull(in)
   } else {
     logger.debug("download . . .")
     current
       .entity
       .dataBytes
       .map(_.toArray)
       .runWith(Sink.head)
       .foreach(
         getAsyncCallback[Array[Byte]](value => push(out,value)).invoke
     )
   }
}