如何将 Doobie 生成的 FS2 Stream 发布到 Kafka

问题描述

我想将一长串事件发布到 Kafka 中,使用 fs2.Stream 对应一个非常大的 DB 行列表,如果编译为 List,最终将导致 Out Of Memotry 错误。

所以,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:

def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO,UUID]

并且我想使用这个发布者将一个事件发布到 Kafka 中,对应于 chunk 500 个键:

trait KeyPublisher {
  def publish(event: ChunkOfKeys): IO[Long]
}

我想创建一个函数来将此流加入/发布到 Kafka 中:

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
  getKeyStream(endDateTime)
     .chunkN(500)
     .evalMap(myChunk => ?????)
     ...
}

如何使用由 DB 发起的流,将其拆分为大小恒定的块,然后将每个块发布到 Kafka 中?

显然很难找到有关此主题的良好文档或示例。你能指出我正确的方向吗?

解决方法

由于您没有说明 ChunkOfKeys 是什么类型,我假设它类似于 Chunk[UUID]

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
    xa: Transactor[IO],publisher: KeyPublisher
): IO[Unit] =
  getKeyStream(endDateTime)
    .transact(xa) // Convert the ConnectionIO stream to Stream[IO,UUID]
    .chunkN(500)  // into Stream[IO,Chunk[UUID]]
    .evalMap(publisher.publish)  // Into Stream[IO,Long]
    .compile
    .drain // An IO[Unit] that describes the whole process

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...