问题描述
我想将Monix Observable与Doobie(fs2)流一起使用,但似乎无法使其正常工作。没有流媒体,我的测试应用程序可以正常退出,但是在使用流媒体之后,我的TaskApp似乎挂起了关机状态,无法弄清原因。
这是重现该问题的最小示例:
package example
import java.util.concurrent.Executors
import doobie.implicits._
import cats.effect.{Blocker,ContextShift,ExitCode,Resource}
import doobie.hikari.HikariTransactor
import monix.eval.{Task,TaskApp}
import com.typesafe.scalalogging.StrictLogging
import fs2.interop.reactivestreams._
import monix.reactive.Observable
import scala.concurrent.ExecutionContext
object Hello extends TaskApp with StrictLogging {
private def resources()(implicit contextShift: ContextShift[Task]): Resource[Task,Resources] = {
for {
transactor <- Database.transactor("org.postgresql.Driver","jdbc:postgresql://localhost/fubar","fubar","fubar")
} yield Resources(transactor)
}
def run(args: List[String]): Task[ExitCode] = resources().use(task)
.flatMap(_ => Task { println("All Done!") })
.flatMap(_ => Task(ExitCode.Success))
def task(resources: Resources): Task[Unit] = {
val publisher =
sql"""select id from message;"""
.query[(Long)]
.stream
.transact(resources.transactor)
.toUnicastPublisher()
Observable.fromReactivePublisher(publisher)
.foreachL(id => logger.info(id.toString))
}
}
case class Resources(transactor: HikariTransactor[Task])
object Database {
val ecBlocking = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
def transactor(dbDriver: String,dbUrl: String,dbUser: String,dbPassword: String)(implicit contextShift: ContextShift[Task]): Resource[Task,HikariTransactor[Task]] = {
HikariTransactor.newHikariTransactor[Task](dbDriver,dbUrl,dbUser,dbPassword,ecBlocking,Blocker.liftExecutionContext(ecBlocking))
}
}
根据Monix文档:https://monix.io/docs/current/reactive/observable.html#fs2
,我已将fs2流转换为可观察到的Monix。我是否需要以某种方式关闭fs2流或Observable才能使应用程序干净退出? 感谢所有可以正常使用的提示或如何正确调试的提示。
解决方法
问题是ExecutionContext
需要关闭。参见作者的答案here。
可以看到正确用法in the documentation。