问题描述
我试图解决的问题可以描述如下:
- k8s 调度程序每周一早上调用我的 Finagle/Finch 应用程序的 HTTP 端点
- HTTP 端点将处理请求并返回
IO[Int]
(立即) - 在生成 HTTP 响应之前,应该启动另一个函数的执行,因此它会异步运行
- 异步进程应该使用来自使用 Doobie 生成的 Postgres RDB 的数据流
- 应该在恒定内存中操作该流,并且应该将数据推送到 Kafka 主题中
val myHTTPEndpoint: Endpoint[IO,Int] =
get("k8s-trigger") { () =>
keyService
.processKeys(LocalDateTime.now)
.map(Ok)
}
class KeyService {
def processKeys(endDateTime: LocalDateTime): IO[Int] =
for {
numberOfKeys <- reportCardRepository.countReportCardKeys(endDateTime) // Doobie Repo
_ <- IO.shift *> enqueueKeys(endDateTime,numberOfKeys,keysAccumulator = 0) // To be executed asynchronously
} yield numberOfKeys
def enqueueKeys(endDateTime: LocalDateTime,keysCounter: Int,keysAccumulator: Int): IO[Int] = ???
所以本质上我想从数据库中获取要处理的键的数量,触发 enqueueKeys()
的异步执行,这将获取 numberOfKeys 作为输入参数,并将这个数字按顺序返回给 HTTP 端点之后立即构建响应,无需等待 enqueueKeys()
完成其计算。
我想知道是否可以将 enqueueKeys()
作为批处理异步执行,该批处理应该发生在另一个线程池 - 另一个 ContextShift。使用 cats.effect.Async 或使用 FS2 是否可行?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)