SqsAckSink 使 Akka 流永远挂起导致图重新启动

问题描述

我正在尝试使用 SQS 源和使用 Localstack 进行测试来实现一个简单的工作流程。 如果我添加 SqsAckSink,我无法使它工作,它也不适用于 SqsAckFlow。但是,如果我删除 SqsAckSink 并只使用 Sink.seq(),则测试通过。拥有 SqsAckSinkSqsAckFlow 会使测试永远挂起。我还在测试中启用了调试,看到同样的错误一次又一次地重复,使图形重新启动,但这对我来说没有多大意义。我在代码片段之后发布以下错误消息。

代码是:

    public class DefaultWorkflow implements Workflow {

  private Function<Throwable,Supervision.Directive> errorStrategy;
  private final ActorSystem actorSystem;
  private FlowMonitor<ProvisioningResult> workflowMonitor;
  private final String queueUrl;
  private final SqsAsyncclient asyncclient;

  @Inject
  public DefaultWorkflow(ActorSystem actorSystem,String queueUrl) {

    this.errorStrategy = exc -> (Supervision.Directive) Supervision.resume();
    this.actorSystem = actorSystem;
    this.queueUrl = queueUrl;

    asyncclient =
        SqsAsyncclient.builder()
            .region(Region.of(Localstack.getDefaultRegion()))
            .httpClient(AkkaHttpClient.builder().withActorSystem(actorSystem).build())
            .endpointOverride(new URI(queueUrl))
            .build();
    doWork();
  }

  private Flow<Message,ProvisioningResult,NotUsed> buildFlow() {
    return Flow.of(Message.class)
        .via(Flow.of(Message.class).map(m -> ProvisioningResult.builder().body(m.body()).build()));
  }

  @Override
  public Source<ProvisioningResult,FlowMonitor<ProvisioningResult>> getSource() {
    Source<Message,NotUsed> sqsSource =
        RestartSource.onFailuresWithBackoff(
            Duration.ofSeconds(1),Duration.ofSeconds(2),0.1,this::createSQSSource);
    return sqsSource
        .via(buildFlow())
        .withAttributes(ActorAttributes.withSupervisionStrategy(errorStrategy))
        .monitorMat(Keep.right());
  }

  private Source<Message,NotUsed> createSQSSource() {
    SqsSourceSettings sqsSourceSettings = SqsSourceSettings.create().withMaxBatchSize(1);
    return SqsSource.create(queueUrl,sqsSourceSettings,asyncclient);
  }

  @Override
  public FlowMonitor<ProvisioningResult> getWorkflowMonitor() {
    return this.workflowMonitor;
  }

  private void doWork() {
    Pair<FlowMonitor<ProvisioningResult>,CompletionStage<Done>> run =
        getSource()
            .toMat(SqsAckSink.create(queueUrl,SqsAckSettings.create(),asyncclient),Keep.both())
            .run(actorSystem);
    workflowMonitor = run.first();
  }
}

测试如下:

  @Test
  public void getSource_givenMessage_shouldProduceResult()
      throws InterruptedException,ExecutionException,TimeoutException,URISyntaxException {
    String sqsName = "sqs2";
    String messageBody = "someMessage";
    String sqsUrl = initSQS(sqsName);
    generateSourceData(sqsUrl,messageBody);

    this.defaultWorkflow = new DefaultWorkflow(this.actorSystem,sqsUrl);

    Source<ProvisioningResult,FlowMonitor<ProvisioningResult>> source =
        defaultWorkflow.getSource();
    final CompletionStage<List<ProvisioningResult>> future =
        source.take(1).runWith(Sink.seq(),materializer);
    final List<ProvisioningResult> result = future.toCompletableFuture().join();
    assertEquals(1,result.size());
    assertEquals(result.get(0).getBody(),messageBody);
  }

  public void generateSourceData(String queueUrl,String messageBody) {
    client
        .sendMessage(
            SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build())
        .join();
  }

  private void initClient() throws URISyntaxException {
    System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(),"false");
    AwsCredentials credentials = AwsBasicCredentials.create("somekey","somevalue");
    StaticCredentialsProvider provider = StaticCredentialsProvider.create(credentials);

    client =
        SqsAsyncclient.builder()
            .region(Region.of(Localstack.getDefaultRegion()))
            .httpClient(AkkaHttpClient.builder().withActorSystem(ActorSystem.create()).build())
            .credentialsProvider(provider)
            .endpointOverride(new URI(Localstack.INSTANCE.getEndpointSQS()))
            .build();
  }

  protected String initSQS(String queueName) throws URISyntaxException {
    initClient();
    client
        .createQueue(CreateQueueRequest.builder().queueName(queueName).build())
        .join()
        .queueUrl();
    GetQueueUrlResponse response = client.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).join();
    System.out.println("Using queue " + response);
    log.info("Using queue {}",response);
    return response.queueUrl();
  }

当我启用调试时,我看到这个永远重复的错误

[info] [debug] s.a.a.c.i.ExecutionInterceptorChain - 创建一个 拦截器链将按以下顺序应用拦截器: [software.amazon.awssdk.awscore.interceptor.HelpfulUnkNownHostExceptionInterceptor@41a58812,software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor@4f626e56,software.amazon.awssdk.protocols.query.interceptor.QueryParametersToBodyInterceptor@4b5de362] [信息] [调试] s.a.a.c.i.ExecutionInterceptorChain - 拦截器 'software.amazon.awssdk.protocols.query.interceptor.QueryParametersToBodyInterceptor@4b5de362' 使用其 modifyHttpRequest 方法修改了消息。 [信息] [调试] s.a.a.request - 发送请求: DefaultSdkHttpFullRequest(httpMethod=POST,protocol=http,主机=本地主机,端口=4566,编码路径=, headers=[amz-sdk-invocation-id,内容长度,内容类型, User-Agent],queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - 要签名的 AWS4 字符串:AWS4-HMAC-SHA256 [信息] 20201230T233655Z [信息] 20201230/us-east-1/sqs/aws4_request [信息] 5e95b0e67072e57434cb0da9516ef2ef4c747760bda87e9207161f9834d6dc01 [信息] [调试] s.a.a.request - 收到错误响应:500 [信息] [调试] s.a.a.request - 检测到可重试错误。将在 47ms 后重试。 请求尝试次数 2 [info] [debug] s.a.a.request - 正在重试 请求:DefaultSdkHttpFullRequest(httpMethod=POST,queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - 要签名的 AWS4 字符串:AWS4-HMAC-SHA256 [信息] 20201230T233655Z [信息] 20201230/us-east-1/sqs/aws4_request [信息] ebce46865d8fface363b83481672fa6a3b7a11f584f2ea7c1e2b56e381e33afc [信息] [调试] s.a.a.request - 收到错误响应:500 [信息] [调试] s.a.a.request - 检测到可重试错误。将在 51 毫秒后重试。 请求尝试次数 3 [info] [debug] s.a.a.request - 正在重试 请求:DefaultSdkHttpFullRequest(httpMethod=POST,queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - 要签名的 AWS4 字符串:AWS4-HMAC-SHA256 [信息] 20201230T233655Z [信息] 20201230/us-east-1/sqs/aws4_request [信息] 4085d30b4fa9cce89c435ed7a6404539a665bc2bfd4322fafd4095d2cb58fab1 [信息] [调试] s.a.a.request - 收到错误响应:500 [信息] [调试] s.a.a.request - 检测到可重试错误。将在 230 毫秒后重试。 请求尝试次数 4 [info] [debug] s.a.a.request - 正在重试 请求:DefaultSdkHttpFullRequest(httpMethod=POST,queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - 要签名的 AWS4 字符串:AWS4-HMAC-SHA256 [信息] 20201230T233655Z [信息] 20201230/us-east-1/sqs/aws4_request [信息] 618037af1ce6008ce721fd3ca9cddde2e3f2893d50bd9ff5a241cc2061e1d67f [信息] [调试] s.a.a.request - 收到错误响应:500 [信息] [警告] a.s.s.RestartWithBackoffSource - 由于 失败。 stack_trace: [信息] java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.SqsException:null(服务: Sqs,状态代码:500,请求 ID:null) [info] at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:60) [信息] 在 software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51) [信息] 在 java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) [信息] 在 java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) [信息] 在 java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [信息] 在 java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) [信息] 在 software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) [信息] 在 java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [信息] 在 java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) [信息] 在 java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [信息] 在 java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) [信息] 在 software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$retryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) [信息] 在 software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$retryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) [信息] 在 software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$retryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:133) [信息] 在 java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [信息] 在 java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) [信息] 在 java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [信息] 在 java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) [信息] 在 software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$1(MakeAsyncHttpRequestStage.java:167) [信息] 在 java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [信息] 在 java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) [信息] 在 java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [信息] 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [信息] 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [信息] 在 java.base/java.lang.Thread.run(Thread.java:834) [信息] 引起:software.amazon.awssdk.services.sqs.model.SqsException: null(服务:Sqs,状态代码:500,请求 ID:null)[info] at software.amazon.awssdk.services.sqs.model.SqsException$BuilderImpl.build(SqsException.java:95) [信息] 在 software.amazon.awssdk.services.sqs.model.SqsException$BuilderImpl.build(SqsException.java:55) [信息] 在 software.amazon.awssdk.protocols.query.internal.unmarshall.AwsXmlErrorUnmarshaller.unmarshall(AwsXmlErrorUnmarshaller.java:97) [信息] 在 software.amazon.awssdk.protocols.query.unmarshall.AwsXmlErrorProtocolUnmarshaller.handle(AwsXmlErrorProtocolUnmarshaller.java:102) [信息] 在 software.amazon.awssdk.protocols.query.unmarshall.AwsXmlErrorProtocolUnmarshaller.handle(AwsXmlErrorProtocolUnmarshaller.java:82) [信息] 在 software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:88) [信息] 在 java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) [信息] 在 java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [信息] 在 java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) [信息] 在 software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:129) [信息] 在 akka.stream.impl.ReactiveStreamsCompliance$.tryOnComplete(ReactiveStreamsCompliance.scala:114) [信息] 在 akka.stream.impl.fusing.ActorgraphInterpreter$ActorOutputBoundary.complete(ActorgraphInterpreter.scala:390) [信息] 在 akka.stream.impl.fusing.ActorgraphInterpreter$ActorOutputBoundary.onUpstreamFinish(ActorgraphInterpreter.scala:416) [信息] 在 akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:523) [信息] 在 akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390) [信息] 在 akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorgraphInterpreter.scala:625) [信息] 在 akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorgraphInterpreter.scala:502) [信息] 在 akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorgraphInterpreter.scala:600) [信息] 在 akka.stream.impl.fusing.ActorgraphInterpreter.akka$stream$impl$fusing$ActorgraphInterpreter$$processEvent(ActorgraphInterpreter.scala:769) [信息] 在 akka.stream.impl.fusing.ActorgraphInterpreter$$anonfun$receive$1.applyOrElse(ActorgraphInterpreter.scala:784) [信息] 在 akka.actor.Actor.aroundReceive(Actor.scala:537) [信息] 在 akka.actor.Actor.aroundReceive$(Actor.scala:535) [信息] 在 akka.stream.impl.fusing.ActorgraphInterpreter.aroundReceive(ActorgraphInterpreter.scala:691) [信息] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577) [信息] 在 akka.actor.ActorCell.invoke(ActorCell.scala:547) [信息] 在 akka.dispatch.MailBox.processMailBox(MailBox.scala:27​​0) [信息] 在 akka.dispatch.MailBox.run(MailBox.scala:231) [信息] 在 akka.dispatch.MailBox.exec(MailBox.scala:243) [信息] 在 java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [信息] 在 java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) [信息] 在 java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [信息] 在 java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [信息] 在 java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)