如何在客户端断开连接的情况下流式传输 fs2.Queue 而不会丢失项目

问题描述

我需要将 fs2.Queue - 一些事件流式传输到单个 http 客户端。

import cats.effect._
import org.http4s._
import org.http4s.dsl.io._
import org.http4s.server.Router
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.syntax.kleisli._

import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt

object StreamingHttp4sFs2Server2 extends App {

  implicit val timer: Timer[IO] = IO.timer(global)
  implicit val cs: ContextShift[IO] = IO.contextShift(global)

  (for {
    queue <- fs2.concurrent.Queue.unbounded[IO,Long]
    _ <- IO {
      fs2.Stream.awakeEvery[IO](1.seconds).map(_.toSeconds)
        .through(queue.enqueue)
        .compile.drain
        .unsafeRunAsyncAndForget()
    }
    _ <- BlazeServerBuilder[IO](ExecutionContext.global)
      .bindHttp(8080,"localhost")
      .withHttpApp(Router("/" -> HttpRoutes.of[IO] {
        case GET -> Root => Ok(queue.dequeue.map(_.toString + "\n"))
      }).orNotFound)
      .serve
      .compile
      .drain
  } yield ()).unsafeRunAsyncAndForget()
}

有时客户端断开连接,一些事件丢失并显示错误 Error writing body

您可以使用 curl 重现问题:

curl --no-buffer -X GET http://localhost:8080/
1
2
3
4
5
^C //hit ctrl+c,curl --no-buffer -X GET http://localhost:8080/
8
9
10

如何在客户端断开连接时不丢失事件的情况下实现某些东西?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...