问题描述
我正在将目前在 GlobalScope
上启动协程的一些 Kotlin 代码重构为一种基于并发的结构化方法。我需要在 JVM 退出之前加入在我的代码中启动的所有作业。我的类可以分解为如下界面:
interface AsyncTasker {
fun spawnJob(arg: Long)
suspend fun joinAll()
}
用法:
fun main(args: Array<String>) {
val asyncTasker = createAsyncTasker()
asyncTasker.spawnJob(100)
asyncTasker.spawnJob(200)
asyncTasker.spawnJob(300)
asyncTasker.spawnJob(500)
// join all jobs as they'd be killed when the JVM exits
runBlocking {
asyncTasker.joinAll()
}
}
我基于 GlobalScope
的实现如下所示:
class GlobalScopeAsyncTasker : AsyncTasker {
private val pendingJobs = mutableSetOf<Job>()
override fun spawnJob(arg: Long) {
var job: Job? = null
job = GlobalScope.launch(Dispatchers.IO) {
someSuspendFun(arg)
pendingJobs.remove(job)
}
pendingJobs.add(job)
}
override suspend fun joinAll() {
// iterate over a copy of the set as the
// jobs remove themselves from the set when we join them
pendingJobs.toSet().joinAll()
}
}
显然,这并不理想,因为跟踪每个待处理的工作并不是很优雅,而且是旧的基于线程的编码范例的残余。
作为更好的方法,我正在创建自己的 CoroutineScope
,用于启动所有子项,提供一个 SupervisorJob
。
class StructuredConcurrencyAsyncTasker : AsyncTasker {
private val parentJob = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.IO + parentJob)
override fun spawnJob(arg: Long) {
scope.launch {
someSuspendFun(arg)
}
}
override suspend fun joinAll() {
parentJob.complete() // <-- why is this needed??
parentJob.join()
}
}
最初开发此解决方案时,我省略了对 parentJob.complete()
的调用,这导致 join()
无限期挂起。这感觉很不直观,所以我正在寻找确认/输入这是否是解决此类问题的正确方法。为什么我必须手动 complete()
父作业?有没有更简单的方法来解决这个问题?
Kotlin playground with the code
解决方法
来自Job#join()
的{{3}}:
此调用在因任何原因完成作业时恢复 [...]
由于我从未将父作业标记为 Completed
,因此 join
永远不会返回,即使该作业的所有子作业都是 Completed
。
考虑到作业永远无法将状态从 Completed
切换回 Active
,因此如果在所有子项都为 {{1} 时它自动将状态切换为 Completed
,这是有道理的},以后将无法添加更多子作业。
感谢 documentation 为我指明了正确的方向。
,我想知道这种行为将来是否会改变。目前,链接问题中的答案仍然成立。目前 parentJob.join()
不加入其子项。对我来说,Job#join()
-documentation 的以下部分是深入挖掘的原因:
请注意,只有当其所有子项都完成时,作业才会完成。
请注意,启动的协程作业可能处于除 completed
之外的其他状态。您可能希望在 parentJob.children.forEach { println(it) }
语句之前通过类似 parentJob.join()
(或您想检查或调试它的任何信息 ;-)) 之类的内容来验证这一点。
有(至少?)两种方法可以确保所有启动的子协程作业都完成,这样它就不会在那时挂起或过早完成:
-
等待所有子作业首先完成(如评论中的链接答案中所述),即:
parentJob.children.forEach { it.join() }
这不需要额外的
parentJob.join()
或parentJob.complete()
,因此可能是首选?parentJob
将在其所有子项完成时完成。 -
在调用
complete
之前先调用join
,即:parentJob.complete() parentJob.join()
请注意,此处调用
complete
只是将状态转换为 compleing,正如Job
documentation 中所述。在 completing 状态下,它也将等待其子项完成。如果您只调用complete()
而没有join
程序可能会退出,甚至在运行您启动的协程作业之前。如果您只join()
,它可能会像您已经经历过的那样无限期暂停。