问题描述
使用Spring MVC,我有一个控制器,其端点返回SseEmitter。
@GetMapping(path = "/accept/{amount}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ApiResponses({
@ApiResponse(responseCode = "200",description = "OK"),@ApiResponse(responseCode = "409",description = "BUSY",content = @Content(schema = @Schema(implementation = MyErrorClass.class))),@ApiResponse(responseCode = "500",description = "UNEXPECTED_ERROR",content = @Content(schema = @Schema(implementation = MyErrorClass.class)))
})
public SseEmitter accept(@Parameter(description = "Amount to accept") @PathVariable double amount) throws MyException {
ExecutorService service = Executors.newCachedThreadPool();
SseEmitter emitter = new SseEmitter();
myService.accept(amount);
service.execute(() -> {
while(myService.inAcceptanceState()) {
try {
emitter.send(myService.getCurrentAmount());
Thread.sleep(1000);
} catch (InterruptedException | IOException ex) {
emitter.completeWithError(ex);
}
}
try {
emitter.send(myService.getCurrentAmount());
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
});
service.shutdown();
return emitter;
}
对于上面的代码,在第一次调用完成之前第二次调用accept()方法时,它会引发400错误。到目前为止,我得到以下信息:
2020-09-03 15:22:14.105 WARN 480 --- [nio-8080-exec-1] .m.m.a.ExceptionHandlerExceptionResolver : Failure in @ExceptionHandler com.mydomain.common.ExceptionController#handleMyException(Exception)
org.springframework.web.HttpMediaTypeNotAcceptableException: Could not find acceptable representation
这不会使我的springboot服务崩溃,但是该调用没有终止,因此导致明显的问题。我已经注意到,如果我更改为produces = MediaType.APPLICATION_JSON_STREAM_VALUE
,它将非常有效。正如我期望的那样,它返回异常和与该异常相关的错误消息,但是我们需要对SseEmitter使用TEXT_STREAM,并且我们的使用者期望这种类型的返回。从我的(有限的)理解来看,Spring在响应主体上发生了一些神奇的HttpConversion,因此,我该怎么做才能使用produces = MediaType.APPLICATION_JSON_STREAM_VALUE
返回异常?我需要编写自己的响应转换器,但是看起来...吗?
解决方法
您没有正确终止执行器。调用shutdown并不能达到您的预期。来自ExecutorService的javadoc中的示例。
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60,TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60,TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
通常,cachedThread池应该是共享资源,并且不应在端点内初始化。 cachedThreadPool的目的是利用线程重用,并避免失控地创建线程。如果确实需要单个线程,请使用newSingleThreadExecutor
我实际上不明白您为什么在这里使用执行器。它似乎没有增加任何价值。您只是阻塞了请求处理程序线程,以使其可以在原始线程中执行,而无法在另一个线程中执行代码。