问题描述
我想将一长串事件发布到 Kafka 中,使用 fs2.Stream 对应一个非常大的 DB 行列表,如果编译为 List,最终将导致 Out Of Memotry 错误。
所以,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:
def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO,UUID]
并且我想使用这个发布者将一个事件发布到 Kafka 中,对应于 chunk 500 个键:
trait KeyPublisher {
def publish(event: ChunkOfKeys): IO[Long]
}
我想创建一个函数来将此流加入/发布到 Kafka 中:
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
getKeyStream(endDateTime)
.chunkN(500)
.evalMap(myChunk => ?????)
...
}
如何使用由 DB 发起的流,将其拆分为大小恒定的块,然后将每个块发布到 Kafka 中?
显然很难找到有关此主题的良好文档或示例。你能指出我正确的方向吗?
解决方法
由于您没有说明 ChunkOfKeys
是什么类型,我假设它类似于 Chunk[UUID]
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
xa: Transactor[IO],publisher: KeyPublisher
): IO[Unit] =
getKeyStream(endDateTime)
.transact(xa) // Convert the ConnectionIO stream to Stream[IO,UUID]
.chunkN(500) // into Stream[IO,Chunk[UUID]]
.evalMap(publisher.publish) // Into Stream[IO,Long]
.compile
.drain // An IO[Unit] that describes the whole process