如何正确加入 CoroutineScope 中启动的所有作业

问题描述

我正在将目前在 GlobalScope 上启动协程的一些 Kotlin 代码重构为一种基于并发的结构化方法。我需要在 JVM 退出之前加入在我的代码中启动的所有作业。我的类可以分解为如下界面:

interface AsyncTasker {
    fun spawnJob(arg: Long)
    suspend fun joinAll()
}

用法:

fun main(args: Array<String>) {
    val asyncTasker = createAsyncTasker()

    asyncTasker.spawnJob(100)
    asyncTasker.spawnJob(200)
    asyncTasker.spawnJob(300)
    asyncTasker.spawnJob(500)

    // join all jobs as they'd be killed when the JVM exits
    runBlocking {
        asyncTasker.joinAll()
    }
}

我基于 GlobalScope 的实现如下所示:

class GlobalScopeAsyncTasker : AsyncTasker {
    private val pendingJobs = mutableSetOf<Job>()

    override fun spawnJob(arg: Long) {
        var job: Job? = null
        job = GlobalScope.launch(Dispatchers.IO) {
            someSuspendFun(arg)
            pendingJobs.remove(job)
        }
        pendingJobs.add(job)
    }

    override suspend fun joinAll() {
        // iterate over a copy of the set as the
        // jobs remove themselves from the set when we join them
        pendingJobs.toSet().joinAll()
    }
}

显然,这并不理想,因为跟踪每个待处理的工作并不是很优雅,而且是旧的基于线程的编码范例的残余。

作为更好的方法,我正在创建自己的 CoroutineScope,用于启动所有子项,提供一个 SupervisorJob

class StructuredConcurrencyAsyncTasker : AsyncTasker {

    private val parentJob = SupervisorJob()
    private val scope = CoroutineScope(Dispatchers.IO + parentJob)

    override fun spawnJob(arg: Long) {
        scope.launch {
            someSuspendFun(arg)
        }
    }

    override suspend fun joinAll() {
        parentJob.complete() // <-- why is this needed??
        parentJob.join()
    }
}

最初开发此解决方案时,我省略了对 parentJob.complete() 的调用,这导致 join() 无限期挂起。这感觉很不直观,所以我正在寻找确认/输入这是否是解决此类问题的正确方法。为什么我必须手动 complete() 父作业?有没有更简单的方法来解决这个问题?

Kotlin playground with the code

解决方法

来自Job#join()的{​​{3}}:

此调用在因任何原因完成作业时恢复 [...]

由于我从未将父作业标记为 Completed,因此 join 永远不会返回,即使该作业的所有子作业都是 Completed

考虑到作业永远无法将状态从 Completed 切换回 Active,因此如果在所有子项都为 {{1} 时它自动将状态切换为 Completed,这是有道理的},以后将无法添加更多子作业。

感谢 documentation 为我指明了正确的方向。

,

我想知道这种行为将来是否会改变。目前,链接问题中的答案仍然成立。目前 parentJob.join() 不加入其子项。对我来说,Job#join()-documentation 的以下部分是深入挖掘的原因:

请注意,只有当其所有子项都完成时,作业才会完成。

请注意,启动的协程作业可能处于除 completed 之外的其他状态。您可能希望在 parentJob.children.forEach { println(it) } 语句之前通过类似 parentJob.join()(或您想检查或调试它的任何信息 ;-)) 之类的内容来验证这一点。

有(至少?)两种方法可以确保所有启动的子协程作业都完成,这样它就不会在那时挂起或过早完成:

  1. 等待所有子作业首先完成(如评论中的链接答案中所述),即:

    parentJob.children.forEach { it.join() }
    

    这不需要额外的 parentJob.join()parentJob.complete(),因此可能是首选? parentJob 将在其所有子项完成时完成。

  2. 在调用 complete 之前先调用 join,即:

    parentJob.complete()
    parentJob.join()
    

    请注意,此处调用 complete 只是将状态转换为 compleing,正如 Job documentation 中所述。在 completing 状态下,它也将等待其子项完成。如果您只调用 complete() 而没有 join 程序可能会退出,甚至在运行您启动的协程作业之前。如果您只join(),它可能会像您已经经历过的那样无限期暂停。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...