问题描述
我正在尝试使用Kotlin中的actor builder结构。我已经编写了以下代码来发送和接收来自演员的消息。
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach
class GreetingsMessage(val to: String,val greetings: CompletableDeferred<String>)
fun Coroutinescope.newGreeter(greet: String) = actor<GreetingsMessage> {
channel.consumeEach {
it.greetings.complete("$greet ${it.to}")
}
}
fun main() {
runBlocking {
val greeter = newGreeter("Hello")
val greetingsMessage = GreetingsMessage("World",CompletableDeferred())
launch(dispatchers.Default) {
greeter.send(greetingsMessage)
}
launch(dispatchers.Default) {
println(greetingsMessage.greetings.await())
greeter.close()
}
}
}
此代码可以正常工作。 但是下面的代码不是,因为它正在挂程序。
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach
class GreetingsMessage(val to: String,val greetings: CompletableDeferred<String>)
suspend fun newGreeter(greet: String) = coroutinescope {
actor<GreetingsMessage> {
channel.consumeEach {
it.greetings.complete("$greet ${it.to}")
}
}
}
fun main() {
runBlocking {
val greeter = newGreeter("Hello")
val greetingsMessage = GreetingsMessage("World",CompletableDeferred())
launch(dispatchers.Default) {
greeter.send(greetingsMessage)
}
launch(dispatchers.Default) {
println(greetingsMessage.greetings.await())
greeter.close()
}
}
}
通过将newGreeter函数用作暂停函数并通过coroutinescope封闭该函数,对代码进行了少许修改,对newGreeter方法的调用将阻塞线程并挂起程序。我相信newGreeter作为Coroutinescope的扩展功能和包含在coroutinescope中的暂停功能应该完全相同。
我尝试了与Produce函数相同的操作,并且在这里我也发现暂停函数以获取ReceieveChannel的调用阻塞了线程,而用作扩展函数的相同Produce结构按预期工作了
此代码不阻塞
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.Coroutinescope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutinescope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun Coroutinescope.produceIntegers(n: Int) = produce<Int> {
for (i in 1..n)
send(i)
close()
}
fun main() {
runBlocking {
val intChan = produceIntegers(10)
launch {
for (i in intChan)
println(i)
}
}
}
package com.byteobject.prototype.kotlin
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutinescope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
suspend fun produceIntegers(n: Int) = coroutinescope {
produce<Int> {
for (i in 1..n)
send(i)
close()
}
}
fun main() {
runBlocking {
val intChan = produceIntegers(10)
launch {
for (i in intChan)
println(i)
}
}
}
解决方法
问题在于coroutineScope { }
创建了一个新的阻止范围(结构化并发)-等待所有已启动的协程完成。
该函数在给定的块及其所有子协程完成后立即返回。
另一方面,扩展功能仅使用上下文(接收方)中的协程镜。