如何在当前消息后关闭 Spring Boot Kafka 流处理应用程序

问题描述

在使用 Spring Cloud Function 的 Spring Boot Kafka 流处理应用程序中,如何关闭应用程序(以停止接收任何进一步的消息)但仅在响应当前消息之后?

场景是这样的:我们有一个应用程序来处理包含对某个文件的引用的消息。我们通过使用一些第三方库询问给定文件内容并使用我们从文件提取的一些数据进行响应来处理这个问题。在极少数情况下,某些文件可能会导致外部库挂起。所以我们在后台线程上调用这些库,如果时间太长就会超时。我们需要为此消息响应 Kafka(使用一些 JSON 详细说明错误),以便 Kafka 不会将它发送到我们应用程序的任何其他实例(因为它也可能导致该实例挂起)。但是我们随后希望我们的 Spring Boot 应用程序的这个实例关闭,因为我们无法从 3rd 方库中的挂起中完全恢复(否则我们可能会获得资源或内存泄漏)。另一个实例将被 Kubernetes 或 Docker Swarm 或其他任何东西自动启动。

我不确定它是否相关,但我们正在使用 Spring Cloud Function,并且我们正在加入两个流:“文件”流,其中每条消息都包含对要处理的文件的引用,以及一个 GlobalKTable一些配置信息。代码如下(在 Kotlin 中):

    // The service we use to run the process on a background thread
    private val executorService = Executors.newFixedThreadPool(1)

    @Bean
    fun process() = BiFunction<KStream<String,FileInfo>,GlobalKTable<String,Config>,KStream<String,FileInterrogationResult>> { fileStream,configTable ->

        requestStream.join(
            configTable,{ _,file -> file.configId },{ file,config ->
                try {
                    // Process the file using the 3rd party libraries.
                    val result = executorService.submit(theThirdPartyLibExtractionFunction)
                        .get(someTimeout,TimeUnit.MILLISECONDS)

                    // Success: return some FileInterrogationResult object wrapping the result from above.
                } catch (e: TimeoutException) {
                    // Return some FileInterrogationResult object with error details.
                    // Todo: Here we kNow that after this function completes the app should shut down. How do we do this?
                } catch (e: Throwable) {
                    // Return some FileInterrogationResult object with error details.
                }
            }
        )

解决方法

您可以使用 Actuator 端点来停止/启动/暂停/恢复以及监控单个绑定。 启用后,您可以简单地使用 curl 来管理绑定。 例如

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName

有关如何操作的确切说明here