从coroutineScope内部创建actor会阻塞线程,但是与CoroutineScope扩展函数创建的actor却不同

问题描述

我正在尝试使用Kotlin中的actor builder结构。我已经编写了以下代码来发送和接收来自演员的消息。

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach

class GreetingsMessage(val to: String,val greetings: CompletableDeferred<String>)

fun Coroutinescope.newGreeter(greet: String) = actor<GreetingsMessage> {
        channel.consumeEach {
            it.greetings.complete("$greet ${it.to}")
        }
    }

fun main() {
    runBlocking {
        val greeter = newGreeter("Hello")
        val greetingsMessage = GreetingsMessage("World",CompletableDeferred())
        launch(dispatchers.Default) {
            greeter.send(greetingsMessage)
        }
        launch(dispatchers.Default) {
            println(greetingsMessage.greetings.await())
            greeter.close()
        }
    }
}

代码可以正常工作。 但是下面的代码不是,因为它正在挂程序。

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach

class GreetingsMessage(val to: String,val greetings: CompletableDeferred<String>)

suspend fun newGreeter(greet: String) = coroutinescope {
    actor<GreetingsMessage> {
        channel.consumeEach {
            it.greetings.complete("$greet ${it.to}")
        }
    }
}

fun main() {
    runBlocking {
        val greeter = newGreeter("Hello")
        val greetingsMessage = GreetingsMessage("World",CompletableDeferred())
        launch(dispatchers.Default) {
            greeter.send(greetingsMessage)
        }
        launch(dispatchers.Default) {
            println(greetingsMessage.greetings.await())
            greeter.close()
        }
    }
}

通过将newGreeter函数用作暂停函数并通过coroutinescope封闭该函数,对代码进行了少许修改,对newGreeter方法调用将阻塞线程并挂起程序。我相信newGreeter作为Coroutinescope的扩展功能和包含在coroutinescope中的暂停功能应该完全相同。

我想知道两种方法间的区别,以及为什么第二种方法挂起程序。

我尝试了与Produce函数相同的操作,并且在这里我也发现暂停函数获取ReceieveChannel的调用阻塞了线程,而用作扩展函数的相同Produce结构按预期工作了

代码不阻塞

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.Coroutinescope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutinescope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun Coroutinescope.produceIntegers(n: Int) = produce<Int> {
        for (i in 1..n)
            send(i)
        close()
    }

fun main() {
    runBlocking {
        val intChan = produceIntegers(10)
        launch {
            for (i in intChan)
                println(i)
        }
    }
}

由于这阻塞了对generateIntegers方法调用

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutinescope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun produceIntegers(n: Int) = coroutinescope {
    produce<Int> {
        for (i in 1..n)
            send(i)
        close()
    }
}

fun main() {
    runBlocking {
        val intChan = produceIntegers(10)
        launch {
            for (i in intChan)
                println(i)
        }
    }
}

解决方法

问题在于coroutineScope { } 创建了一个新的阻止范围(结构化并发)-等待所有已启动的协程完成。

请参阅:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html

该函数在给定的块及其所有子协程完成后立即返回。

另一方面,扩展功能仅使用上下文(接收方)中的协程镜。