为什么在 thenApplyAsync 上阻止工作但在 thenApply 上不起作用

问题描述

我们在应用程序中看到了一些有趣的行为。以下 Spock 规范捕获了该行为。我试图理解为什么第二个测试通过但第一个抛出 TimeoutException。

总结: 有一个带有模拟端点的模拟服务器,它在 10 毫秒延迟后成功响应。 我们使用 AsyncHttpClient 对这个模拟端点进行非阻塞调用。第一个调用与对同一端点的第二个阻塞调用链接在一起。如果使用 thenApply,则第一个调用成功,但第二个调用失败并超时,但如果使用 thenApplyAsync,则调用成功。在这两种情况下,模拟服务器似乎都在 10 毫秒内响应。

依赖:


    implementation 'com.google.guava:guava:29.0-jre'
    implementation 'org.asynchttpclient:async-http-client:2.12.1'

    // Use the latest Groovy version for Spock testing
    testImplementation 'org.codehaus.groovy:groovy-all:2.5.11'

    // Use the awesome Spock testing and specification framework even with Java
    testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
    testImplementation 'org.objenesis:objenesis:1.4'
    testImplementation "cglib:cglib:2.2"
    testImplementation 'junit:junit:4.13'
    testImplementation 'org.mock-server:mockserver-netty:5.11.1'

Spock 规格:


package com.switchcase.asyncthroughput

import com.google.common.base.Charsets
import org.asynchttpclient.DefaultAsyncHttpClient
import org.asynchttpclient.RequestBuilder
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.HttpResponse
import spock.lang.Shared
import spock.lang.Specification

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import static org.mockserver.integration.ClientAndServer.startClientAndServer
import static org.mockserver.model.HttpRequest.request

class CompletableFutureThreadsTest extends Specification {

    @Shared
    ClientAndServer mockServer

    def asyncHttpClient = new DefaultAsyncHttpClient();

    def setupSpec() {
        mockServer = startClientAndServer(9192);
        //create a mock server which response with "done" after 100ms.
        mockServer.when(request()
                .withMethod("POST")
                .withPath("/validate"))
                .respond(HttpResponse.response().withBody("done")
                        .withStatusCode(200)
                        .withDelay(TimeUnit.MILLISECONDS,10));
    }

    def "Calls external using AHC with a blocking call with 1sec timeout results in TimeoutException."() {
        when:
        callExternal().thenApply({ resp -> callExternalBlocking() }).join()

        then:
        def exception = thrown(CompletionException)
        exception instanceof CompletionException
        exception.getCause() instanceof TimeoutException
        exception.printstacktrace()
    }

    def "Calls external using AHC with a blocking call on ForkJoinPool with 1sec timeout results in success."() {
        when:
        def value = callExternal().thenApplyAsync({ resp -> callExternalBlocking() }).join()

        then:
        value == "done"
    }

    def cleanupSpec() {
        mockServer.stop(true)
    }

    private CompletableFuture<String> callExternal(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternal Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        })
    }

    private String callExternalBlocking(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternalBlocking Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        }).join()
    }
}

编辑:

超时的调试日志和堆栈跟踪:(超时发生在 callExternalBlocking 中的远程调用上)

17:37:38.885 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.timeout.TimeoutTimerTask - Request timeout to localhost/127.0.0.1:9192 after 1000 ms for NettyResponseFuture{currentRetry=0,isDone=0,isCancelled=0,asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed,1 dependents],uri=http://localhost:9192/validate,keepAlive=true,redirectCount=0,timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,inAuth=0,touch=1622248657866} after 1019 ms
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.channel.ChannelManager - Closing Channel [id: 0x5485056c,L:/127.0.0.1:58076 - R:localhost/127.0.0.1:9192] 
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.request.NettyRequestSender - Aborting Future NettyResponseFuture{currentRetry=0,touch=1622248657866}

java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniapply(CompletableFuture.java:607)
    at java.util.concurrent.CompletableFuture$uniapply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:273)
    at org.asynchttpclient.netty.request.NettyRequestSender.abort(NettyRequestSender.java:473)
    at org.asynchttpclient.netty.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:43)
    at org.asynchttpclient.netty.timeout.RequestTimeoutTimerTask.run(RequestTimeoutTimerTask.java:50)
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
    at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
    at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    ... 7 more

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)