如何使用连接池让 Akka HTTP 背压?

问题描述

我在 AWS ECS 中运行的服务中运行 Akka Streams Typed 2.6.9 和 Akka HTTP 10.1.12,我在我的流应用程序中收到错误消息“超出配置的最大打开请求值 32” .我一直在寻找答案,现在肯定已经阅读了 this 十几遍。

应用程序从 AWS SQS 队列流式传输并使用此信息查询 REST 服务。
该应用程序正在“全程”流式传输。根据文档 hereHttp().superPool() 实现应该背压。如果是这样,为什么我要填写 max-open-requests?难道不应该对队列的连接进行背压,而不是使池过载吗?

我尝试过限制请求,将所有 mapAsync() 调用更改为一个线程,甚至从队列中进行限制,但池最终填满,但速度很慢。我做错了什么?

代码如下...

object ExampleActor {
  object Start extends Protocol

  private val logger = LoggerFactory.getLogger(getClass)

  def buildAwsSqsClient(actorSystem: ActorSystem[nothing]): SqsAsyncclient = {
    val awsCredentialsProvider = DefaultCredentialsProvider.create()
    val awsSqsClientBuilder = SqsAsyncclient
      .builder()
      .credentialsProvider(awsCredentialsProvider)
      .region(Region.US_EAST_1)
      .httpClient(AkkaHttpClient.builder().withActorSystem(actorSystem.classicSystem).build())
    // Possibility to configure the retry policy
    // see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
    // .overrideConfiguration(...)

    if(System.getProperty("localstack","false").equals("true")) {
      println("****Enabling localstack resource")
      awsSqsClientBuilder.endpointOverride(new URI("http://localhost:4566"))
    }

    implicit val awsSqsClient = awsSqsClientBuilder.build()

    actorSystem.classicSystem.registerOnTermination(awsSqsClient.close())

    awsSqsClient
  }

  def apply(): Behavior[Protocol] =
    Behaviors.setup { ctx: ActorContext[Protocol] =>
      def process(): Behavior[Protocol] =
        Behaviors.receiveMessage {
          case Start =>
            println("Start up")
            Behaviors.same[Protocol]
        }

      process()
    }

  def setupStream(context: ActorContext[Protocol])(implicit sqsClient: SqsAsyncclient,ex: ExecutionContext) = {
    val deleteSink = Flow[Message]
      .map(MessageAction.delete)
      .log("Mapping to delete action for getting rid of message from queue")
      .to(SqsAckSink("libraryPublish",SqsAckSettings.create()))

    val fromWebService = SqsSource(
      "libraryPublish",SqsSourceSettings.create()
        .withWaitTime(500.milliseconds)
    )
      .alsoto(deleteSink)
      .map(_.body())

    val endingSink = Flow[BookCommand]
      .map(group => SendMessageRequest.builder().messageBody(group.toJson.toString).build())
      .log("Building message to SQS")
      .to(SqsPublishSink.messageSink("libraryPublishPost"))

    Flow[String]
      .map(runGroupId => retrieveLibrary(runGroupId))
      .via(queryBooks(context))
  }

  def queryBooks(context: ActorContext[Protocol]) = {
    Flow[Library]
      .map(library => {
        LibraryQuery(
          library.id,library.books.map(book =>
            AuthorIndex(
              book.author.lastName,library.tags match {
                case Subject(tagName) =>
                  DeweySearch.createSearch(tagName)
                case Character(tagName) =>
                  CharacterSearch(tagName)
                case Publisher(tagName) =>
                  PublisherSearch("publisher:" + tagName)
              }
            )
          )
        )
      })
      .mapAsync(4)(libraryQuery => {
          val seq = Source.single(libraryQuery)
            .mapConcat(_.bookQueries)
            .via(runBookQueries(context))
            .runWith(Sink.seq)

          LibraryAnswer(libraryQuery.id,seq)
        }
      )
      .map(libraryAnswer => insertIntoDb(libraryAnswer))
      .runWith(Sink.last)
  }

  def runBookQueries(context: ActorContext[Protocol]) = {
    implicit val ec = context.executionContext
    implicit val sys = context.system.classicSystem

    val httpFlow: Flow[(HttpRequest,BookQuery),(Try[HttpResponse],NotUsed] =
      Http().superPool[BookQuery]()

    val retryFlow: Flow[(HttpRequest,NotUsed] = {
      RetryFlow.withBackoff(minBackoff = 10.millis,maxBackoff = 5.seconds,randomFactor = 0d,maxRetries = 3,httpFlow)(
        decideretry = { (request,response) =>
          if(response._1.isFailure || response._1.get.status.isFailure) {
            logger.debug("retrying answer query from {} response",response._1.get.status)
            response._1.get.discardEntityBytes()
            Some(request)
          } else {
            logger.debug("Request went through...")
            None
          }
        }
      )
    }

    Flow[BookQuery]
      .mapAsync(4)((query: BookQuery) => {
        //Cached token getter is a function that gets an API token for the book service
        CachedTokenGetter.getToken(context).map(token =>
          (
            Post("https://my.example.service")
              .withEntity(httpentity(ContentTypes.`application/json`,query.toJson.toString))
              .withHeaders(Authorization(OAuth2BearerToken(token.access_token))),query
          ))
      })
      .via(retryFlow)
      .flatMapConcat(pair => {
        getBookResponseFromPair(pair).map(answer => (pair._2,answer))
      })
      .map(pair => BookQueryAndAnswer(pair._1,pair._2))
  }

  def getBookResponseFromPair(pair: (Try[HttpResponse],BookQuery)): Source[BookAnswer,Any] = {
    pair._1.get.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .mapAsync(10)(bytes => Unmarshal(bytes).to[BookQueryResponse])
      .mapConcat(_.titles)
  }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)