问题描述
我有一个 spring boot kotlin 应用程序,它创建到另一个 spring 应用程序的 Web 套接字连接,发送多个“订阅”消息,然后需要等待在 Web 套接字连接上每个订阅收到一个响应。在给定时间打开的订阅数量可能高达几千个。
我使用 CompletableFuture 和协程提出了一个基本的工作解决方案,如下所示。是否有更惯用或更简洁的方法来完成这项任务,或者这是一个很好的解决方案?任何改进建议表示赞赏。
// InputObject / ResponSEObject are generic placeholders
fun getItems(inputObjects: List<InputObject>): List<ResponSEObject> {
val ret: ConcurrentLinkedQueue<ResponSEObject> = ConcurrentLinkedQueue()
// create a completable future for each input object
val subscriptions: MutableMap<String,CompletableFuture<ResponSEObject>> = mutableMapOf()
inputObjects.forEach {
subscriptions[it.id] = CompletableFuture()
}
// create web socket client configured with a lambda handler to
// fulfill each subscription
// each responSEObject.id matches one inputObject.id
val client = createWebSocketClient({
try {
val responSEObject = objectMapper.readValue(it,ResponSEObject::class.java)
subscriptions[responSEObject.id]?.complete(responSEObject)
} catch (e: Exception) {
logger.warn("Exception reading data: ${e.message}")
}
})
runBlocking {
coroutinescope {
for (item in inputObjects) {
launch {
// create and send a subscribe request
client.sendMessage(createSubscribe(item.id))
// wait for each future to complete
// uses CompletableFuture extension await() from kotlinx-coroutines-jdk8
val result = subscriptions[item.id]?.await()
if (result != null) {
ret.add(result)
}
}
}
}
}
client.close()
return ret.toList()
}
编辑:我发现了一个类似的问题:How to pass result as it comes using coroutines?
哪些选项最有意义?
解决方法
fun getItems(inputObjects: List<InputObject>): List<ResponseObject> {
val subscriptions = ids.associateTo(mutableMapOf()) { it.id to CompletableFuture<ResponseObject>() }
val client = createWebSocketClient({
try {
val responseObject = objectMapper.readValue(it,ResponseObject::class.java)
subscriptions[responseObject.id]?.complete(responseObject)
} catch (e: Exception) {
logger.warn("Exception reading data: ${e.message}")
}
})
return runBlocking(Dispatchers.IO) {
inputObjects
.mapNotNull {
client.sendMessage(createSubscribe(item.id))
subscriptions[item.id]?.await()
}
}
}