问题描述
我在 AWS ECS 中运行的服务中运行 Akka Streams Typed 2.6.9 和 Akka HTTP 10.1.12,我在我的流应用程序中收到错误消息“超出配置的最大打开请求值 32” .我一直在寻找答案,现在肯定已经阅读了 this 十几遍。
应用程序从 AWS SQS 队列流式传输并使用此信息查询 REST 服务。
该应用程序正在“全程”流式传输。根据文档 here,Http().superPool()
实现应该背压。如果是这样,为什么我要填写 max-open-requests?难道不应该对队列的连接进行背压,而不是使池过载吗?
我尝试过限制请求,将所有 mapAsync()
调用更改为一个线程,甚至从队列中进行限制,但池最终填满,但速度很慢。我做错了什么?
代码如下...
object ExampleActor {
object Start extends Protocol
private val logger = LoggerFactory.getLogger(getClass)
def buildAwsSqsClient(actorSystem: ActorSystem[nothing]): SqsAsyncclient = {
val awsCredentialsProvider = DefaultCredentialsProvider.create()
val awsSqsClientBuilder = SqsAsyncclient
.builder()
.credentialsProvider(awsCredentialsProvider)
.region(Region.US_EAST_1)
.httpClient(AkkaHttpClient.builder().withActorSystem(actorSystem.classicSystem).build())
// Possibility to configure the retry policy
// see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
// .overrideConfiguration(...)
if(System.getProperty("localstack","false").equals("true")) {
println("****Enabling localstack resource")
awsSqsClientBuilder.endpointOverride(new URI("http://localhost:4566"))
}
implicit val awsSqsClient = awsSqsClientBuilder.build()
actorSystem.classicSystem.registerOnTermination(awsSqsClient.close())
awsSqsClient
}
def apply(): Behavior[Protocol] =
Behaviors.setup { ctx: ActorContext[Protocol] =>
def process(): Behavior[Protocol] =
Behaviors.receiveMessage {
case Start =>
println("Start up")
Behaviors.same[Protocol]
}
process()
}
def setupStream(context: ActorContext[Protocol])(implicit sqsClient: SqsAsyncclient,ex: ExecutionContext) = {
val deleteSink = Flow[Message]
.map(MessageAction.delete)
.log("Mapping to delete action for getting rid of message from queue")
.to(SqsAckSink("libraryPublish",SqsAckSettings.create()))
val fromWebService = SqsSource(
"libraryPublish",SqsSourceSettings.create()
.withWaitTime(500.milliseconds)
)
.alsoto(deleteSink)
.map(_.body())
val endingSink = Flow[BookCommand]
.map(group => SendMessageRequest.builder().messageBody(group.toJson.toString).build())
.log("Building message to SQS")
.to(SqsPublishSink.messageSink("libraryPublishPost"))
Flow[String]
.map(runGroupId => retrieveLibrary(runGroupId))
.via(queryBooks(context))
}
def queryBooks(context: ActorContext[Protocol]) = {
Flow[Library]
.map(library => {
LibraryQuery(
library.id,library.books.map(book =>
AuthorIndex(
book.author.lastName,library.tags match {
case Subject(tagName) =>
DeweySearch.createSearch(tagName)
case Character(tagName) =>
CharacterSearch(tagName)
case Publisher(tagName) =>
PublisherSearch("publisher:" + tagName)
}
)
)
)
})
.mapAsync(4)(libraryQuery => {
val seq = Source.single(libraryQuery)
.mapConcat(_.bookQueries)
.via(runBookQueries(context))
.runWith(Sink.seq)
LibraryAnswer(libraryQuery.id,seq)
}
)
.map(libraryAnswer => insertIntoDb(libraryAnswer))
.runWith(Sink.last)
}
def runBookQueries(context: ActorContext[Protocol]) = {
implicit val ec = context.executionContext
implicit val sys = context.system.classicSystem
val httpFlow: Flow[(HttpRequest,BookQuery),(Try[HttpResponse],NotUsed] =
Http().superPool[BookQuery]()
val retryFlow: Flow[(HttpRequest,NotUsed] = {
RetryFlow.withBackoff(minBackoff = 10.millis,maxBackoff = 5.seconds,randomFactor = 0d,maxRetries = 3,httpFlow)(
decideretry = { (request,response) =>
if(response._1.isFailure || response._1.get.status.isFailure) {
logger.debug("retrying answer query from {} response",response._1.get.status)
response._1.get.discardEntityBytes()
Some(request)
} else {
logger.debug("Request went through...")
None
}
}
)
}
Flow[BookQuery]
.mapAsync(4)((query: BookQuery) => {
//Cached token getter is a function that gets an API token for the book service
CachedTokenGetter.getToken(context).map(token =>
(
Post("https://my.example.service")
.withEntity(httpentity(ContentTypes.`application/json`,query.toJson.toString))
.withHeaders(Authorization(OAuth2BearerToken(token.access_token))),query
))
})
.via(retryFlow)
.flatMapConcat(pair => {
getBookResponseFromPair(pair).map(answer => (pair._2,answer))
})
.map(pair => BookQueryAndAnswer(pair._1,pair._2))
}
def getBookResponseFromPair(pair: (Try[HttpResponse],BookQuery)): Source[BookAnswer,Any] = {
pair._1.get.entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.mapAsync(10)(bytes => Unmarshal(bytes).to[BookQueryResponse])
.mapConcat(_.titles)
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)