Spring Integration HTTP出站网关基于回复内容重试

问题描述

我使用的API分两个步骤工作:

  1. 它将以异步方式开始处理文档,并为您提供您在第2步中使用的ID
  2. 它提供了一个端点,您可以在其中获取结果,但前提是要准备好结果。因此,基本上,它将始终为您提供200条响应,其中包含一些详细信息,例如处理状态。

所以问题是我如何为HTTP出站网关实现自定义“成功”标准。我还想将其与已经实现的RetryAdvice结合起来。

我尝试了以下操作,但是首先HandleMessageAdvice中提供的消息有效负载为空,其次,不触发重试:

.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
        ".0/read/analyzeResults/abc")
        .mappedRequestHeaders("Ocp-Apim-Subscription-Key")
        .httpMethod(HttpMethod.GET),c -> c.advice(this.advices.retryAdvice())
              .handleMessageAdvice(new AbstractHandleMessageAdvice() {
    @Override
    protected Object doInvoke(MethodInvocation invocation,Message<?> message) throws Throwable {
        String body = (String) message.getPayload();
        if (StringUtils.isEmpty(body))
            throw new RuntimeException("Still analyzing");
        JSONObject document = new JSONObject(body);
        if (document.has("analyzeResult"))
            return message;
        else
            throw new RuntimeException("Still analyzing");
    }
}))

我从4年前就已经从Artem找到了这个答案,但首先我没有在出站网关上找到回复渠道的方法,其次我不确定在新版本的Spring Integaration中这种情况是否已得到改善:http outbound retry with conditions (For checker condition)

更新

按照Artem的建议,我有以下内容

.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
        ".0/read/analyzeResults/abc")
        .mappedRequestHeaders("Ocp-Apim-Subscription-Key")
        .httpMethod(HttpMethod.GET),c -> c.advice(advices.verifyReplySuccess())
        .advice(advices.retryUntilRequestCompleteAdvice()))

建议:

@Bean
public Advice verifyReplySuccess() {
    return new AbstractRequestHandlerAdvice() {
        @Override
        protected Object doInvoke(ExecutionCallback callback,Object target,Message<?> message) {
            try {
                Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
                String body = (String) ((ResponseEntity) payload).getBody();
                JSONObject document = new JSONObject(body);
                if (document.has("analyzeResult"))
                    return message;
            } catch (JSONException e) {
                throw new RuntimeException(e);
            }
            throw new RuntimeException("Still analyzing");
        }
    };
}

但是现在当我调试 doInvoke 方法时,有效载荷的主体为 null 。奇怪的是,当我使用Postman执行相同的GET请求时,正确返回了正文。有想法吗?

使用Postman进行回复的正文如下:

{
    "status": "succeeded","createdDateTime": "2020-09-01T10:55:52Z","lastUpdatedDateTime": "2020-09-01T10:55:57Z","analyzeResult": {
        "version": "3.0.0","readResults": [
            {
                "page": 1,........

这是我使用回调从出站网关获得的有效负载

enter image description here

<200,[transfer-encoding:"chunked",Content-Type:"application/json; charset=utf-8",x-envoy-upstream-service-time:"27",CSP-Billing-Usage:"CognitiveServices.ComputerVision.Transaction=1",apim-request-id:"a503c72f-deae-4299-9e32-625d831cfd91",Strict-Transport-Security:"max-age=31536000; includeSubDomains; preload",x-content-type-options:"nosniff",Date:"Tue,01 Sep 2020 19:48:36 GMT"]>

解决方法

Java DSL中确实没有requestreply通道选项,因为您只需将handle()包装到channel()配置中,或者仅以自然流方式将端点链接起来即可,他们将使用两者之间的隐式直接渠道交换消息。您可以在XML配置中将Java DSL IntegrationFlow视为<chain>

您的建议配置有点错误:您需要将自定义建议声明为链中的第一个,因此当从那里引发异常时,将尝试重试。

您还应该考虑实现AbstractRequestHandlerAdvice以使其与RequestHandlerRetryAdvice逻辑保持一致。

您在此处实现doInvoke(),调用ExecutionCallback.execute()并分析结果以按原样返回或引发所需的异常。调用HttpRequestExecutingMessageHandler的结果将是AbstractIntegrationMessageBuilder,可能是ResponseEntity作为payload,以检查您的进一步逻辑。

,

按照Artem的建议,我提出了以下建议(其他技巧是将 expectedResponseType 设置为String,否则使用 ResponseEntity 主体为空):

.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
        ".0/read/analyzeResults/abc")
        .mappedRequestHeaders("Ocp-Apim-Subscription-Key")
        .httpMethod(HttpMethod.GET).expectedResponseType(String.class),c -> c.advice(advices.retryUntilRequestCompleteAdvice())
              .advice(advices.verifyReplySuccess()))

建议:

@Bean
public Advice verifyReplySuccess() {
    return new AbstractRequestHandlerAdvice() {
        @Override
        protected Object doInvoke(ExecutionCallback callback,Object target,Message<?> message) {
            Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
            if (((String) payload).contains("analyzeResult"))
                return payload;
            else
                throw new RuntimeException("Still analyzing");
        }
    };
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...