通过Doobie 0.9.0可以正确使用Monix 3.2.2

问题描述

我想将Monix Observable与Doobie(fs2)流一起使用,但似乎无法使其正常工作。没有流媒体,我的测试应用程序可以正常退出,但是在使用流媒体之后,我的TaskApp似乎挂起了关机状态,无法弄清原因。

这是重现该问题的最小示例:

package example

import java.util.concurrent.Executors

import doobie.implicits._
import cats.effect.{Blocker,ContextShift,ExitCode,Resource}
import doobie.hikari.HikariTransactor
import monix.eval.{Task,TaskApp}
import com.typesafe.scalalogging.StrictLogging
import fs2.interop.reactivestreams._
import monix.reactive.Observable

import scala.concurrent.ExecutionContext
 
object Hello extends TaskApp with StrictLogging {
    
  private def resources()(implicit contextShift: ContextShift[Task]): Resource[Task,Resources] = {
    for {
      transactor <- Database.transactor("org.postgresql.Driver","jdbc:postgresql://localhost/fubar","fubar","fubar")
    } yield Resources(transactor)
  }

  def run(args: List[String]): Task[ExitCode] = resources().use(task)
    .flatMap(_ => Task { println("All Done!") })
    .flatMap(_ => Task(ExitCode.Success))
  
  def task(resources: Resources): Task[Unit] = {

    val publisher =
      
      sql"""select id from message;"""

        .query[(Long)]
        .stream
        .transact(resources.transactor)
        .toUnicastPublisher()

    Observable.fromReactivePublisher(publisher)
      .foreachL(id => logger.info(id.toString))
    
  }
  
}

case class Resources(transactor: HikariTransactor[Task])

object Database {

  val ecBlocking = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

  def transactor(dbDriver: String,dbUrl: String,dbUser: String,dbPassword: String)(implicit contextShift: ContextShift[Task]): Resource[Task,HikariTransactor[Task]] = {
    HikariTransactor.newHikariTransactor[Task](dbDriver,dbUrl,dbUser,dbPassword,ecBlocking,Blocker.liftExecutionContext(ecBlocking))
  }

}

根据Monix文档:https://monix.io/docs/current/reactive/observable.html#fs2

,我已将fs2流转换为可观察到的Monix。

我是否需要以某种方式关闭fs2流或Observable才能使应用程序干净退出? 感谢所有可以正常使用的提示或如何正确调试的提示。

解决方法

问题是ExecutionContext需要关闭。参见作者的答案here

可以看到正确用法in the documentation

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...