是否可以在立即返回 HTTP 响应的同时使用 cat.effect.IO 和 FS2 Streams 将值异步推送到 Kafka?

问题描述

我试图解决的问题可以描述如下:

  1. k8s 调度程序每周一早上调用我的 Finagle/Finch 应用程序的 HTTP 端点
  2. HTTP 端点将处理请求并返回 IO[Int](立即)
  3. 在生成 HTTP 响应之前,应该启动另一个函数的执行,因此它会异步运行
  4. 异步进程应该使用来自使用 Doobie 生成的 Postgres RDB 的数据流
  5. 应该在恒定内存中操作该流,并且应该将数据推送到 Kafka 主题中
  val myHTTPEndpoint: Endpoint[IO,Int] =
    get("k8s-trigger") { () =>
      keyService
        .processKeys(LocalDateTime.now)
        .map(Ok)
    }
class KeyService {
  def processKeys(endDateTime: LocalDateTime): IO[Int] =
    for {
      numberOfKeys <- reportCardRepository.countReportCardKeys(endDateTime) // Doobie Repo
      _ <- IO.shift *> enqueueKeys(endDateTime,numberOfKeys,keysAccumulator = 0) // To be executed asynchronously
    } yield numberOfKeys

  def enqueueKeys(endDateTime: LocalDateTime,keysCounter: Int,keysAccumulator: Int): IO[Int] = ???

所以本质上我想从数据库中获取要处理的键的数量,触发 enqueueKeys() 的异步执行,这将获取 numberOfKeys 作为输入参数,并将这个数字按顺序返回给 HTTP 端点之后立即构建响应,无需等待 enqueueKeys() 完成其计算。

我想知道是否可以将 enqueueKeys() 作为批处理异步执行,该批处理应该发生在另一个线程池 - 另一个 ContextShift。使用 cats.effect.Async 或使用 FS2 是否可行?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...