如何终止从 SBT Shell 启动的 FS2 流?

问题描述

如果我从 SBT shell 运行这个程序,然后取消它,它会继续打印“hello”。我必须退出 SBT 才能让它停止。这是为什么?

import cats.effect.{ExitCode,IO,IOApp}
import fs2.Stream
import scala.concurrent.duration._

object FS2 extends IOApp {

  override def run(args: List[String]) = 
      Stream.awakeEvery[IO](5.seconds).map { _ =>
        println("hello")
      }.compile.drain.as(ExitCode.Error)
}

解决方法

正如评论中已经提到的,您的应用程序在另一个线程中运行并且它永远不会终止,因为流是无限的,因此当应用程序收到 SIGTERM 或 SIGINT 等信号时,您必须手动终止它(每当您点击 ctrl+c 终止应用程序时,它就会发出。

你可以这样做:

  1. 创建一个 Deferred 实例
  2. 使用它在接收到任何 TERM 或 INT 信号后触发 interruptWhen

例如:

object FS2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = for {
    cancel <- Deferred[IO,Either[Throwable,Unit]] //deferred used as flat telling if terminations signal
                                                    //was received
    _ <- (IO.async[Unit]{ cb =>
      Signal.handle(
        new Signal("INT"),//INT and TERM signals are nearly identical,we have to handle both
        (sig: Signal) => cb(Right(()))
      )
      Signal.handle(
        new Signal("TERM"),(sig: Signal) => cb(Right(()))
      )
    } *> cancel.complete(Right(()))).start //after INT or TERM signal is intercepted it will complete
                                           //deferred and terminate fiber
                                           //we have to run method start to run waiting for signal in another fiber
                                           //in other case program will block here
    app <- Stream.awakeEvery[IO](1.seconds).map { _ => //your stream
      println("hello")
    }.interruptWhen(cancel).compile.drain.as(ExitCode.Error)  //interruptWhen ends stream when deferred completes
  } yield app

}

只要您在 sbt shell 中点击 ctrl + c,此版本的应用就会终止。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...