问题描述
我需要将 fs2.Queue
- 一些事件流式传输到单个 http 客户端。
import cats.effect._
import org.http4s._
import org.http4s.dsl.io._
import org.http4s.server.Router
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.syntax.kleisli._
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
object StreamingHttp4sFs2Server2 extends App {
implicit val timer: Timer[IO] = IO.timer(global)
implicit val cs: ContextShift[IO] = IO.contextShift(global)
(for {
queue <- fs2.concurrent.Queue.unbounded[IO,Long]
_ <- IO {
fs2.Stream.awakeEvery[IO](1.seconds).map(_.toSeconds)
.through(queue.enqueue)
.compile.drain
.unsafeRunAsyncAndForget()
}
_ <- BlazeServerBuilder[IO](ExecutionContext.global)
.bindHttp(8080,"localhost")
.withHttpApp(Router("/" -> HttpRoutes.of[IO] {
case GET -> Root => Ok(queue.dequeue.map(_.toString + "\n"))
}).orNotFound)
.serve
.compile
.drain
} yield ()).unsafeRunAsyncAndForget()
}
有时客户端断开连接,一些事件丢失并显示错误 Error writing body
。
您可以使用 curl 重现问题:
curl --no-buffer -X GET http://localhost:8080/
1
2
3
4
5
^C //hit ctrl+c,curl --no-buffer -X GET http://localhost:8080/
8
9
10
如何在客户端断开连接时不丢失事件的情况下实现某些东西?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)