有没有更惯用的方式来执行订阅和异步/等待操作?

问题描述

我有一个 spring boot kotlin 应用程序,它创建到另一个 spring 应用程序的 Web 套接字连接,发送多个“订阅”消息,然后需要等待在 Web 套接字连接上每个订阅收到一个响应。在给定时间打开的订阅数量可能高达几千个。

我使用 CompletableFuture 和协程提出了一个基本的工作解决方案,如下所示。是否有更惯用或更简洁的方法来完成这项任务,或者这是一个很好的解决方案?任何改进建议表示赞赏。

// InputObject / ResponSEObject are generic placeholders
fun getItems(inputObjects: List<InputObject>): List<ResponSEObject> {
    val ret: ConcurrentLinkedQueue<ResponSEObject> = ConcurrentLinkedQueue()

    // create a completable future for each input object
    val subscriptions: MutableMap<String,CompletableFuture<ResponSEObject>> = mutableMapOf()
    inputObjects.forEach {
        subscriptions[it.id] = CompletableFuture()
    }

    // create web socket client configured with a lambda handler to
    // fulfill each subscription
    // each responSEObject.id matches one inputObject.id
    val client = createWebSocketClient({
        try {
            val responSEObject = objectMapper.readValue(it,ResponSEObject::class.java)
            subscriptions[responSEObject.id]?.complete(responSEObject)
        } catch (e: Exception) {
            logger.warn("Exception reading data: ${e.message}")
        }
    })

    runBlocking {
        coroutinescope {
            for (item in inputObjects) {
                launch {
                    // create and send a subscribe request
                    client.sendMessage(createSubscribe(item.id))

                    // wait for each future to complete
                    // uses CompletableFuture extension await() from kotlinx-coroutines-jdk8
                    val result = subscriptions[item.id]?.await()
                    if (result != null) {
                        ret.add(result)
                    }
                }
            }
        }
    }

    client.close()

    return ret.toList()
}

编辑:我发现了一个类似的问题:How to pass result as it comes using coroutines?

哪些选项最有意义?

解决方法

fun getItems(inputObjects: List<InputObject>): List<ResponseObject> {
    val subscriptions = ids.associateTo(mutableMapOf()) { it.id to CompletableFuture<ResponseObject>() }

    val client = createWebSocketClient({
        try {
            val responseObject = objectMapper.readValue(it,ResponseObject::class.java)
            subscriptions[responseObject.id]?.complete(responseObject)
        } catch (e: Exception) {
            logger.warn("Exception reading data: ${e.message}")
        }
    })

    return runBlocking(Dispatchers.IO) {
      inputObjects
        .mapNotNull {
            client.sendMessage(createSubscribe(item.id))
            subscriptions[item.id]?.await()
        }
    }
}