阅读Spark中的Kafka主题尾巴

问题描述

我需要订阅Kafka主题latest偏移,阅读一些最新记录,打印并完成。如何在Spark中执行此操作?我想我可以做这样的事情

sqlContext
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers","192.168.1.1:9092,...")
    .option("subscribe","myTopic")
    .option("startingOffsets","latest")
    .filter($"someField" === "someValue")
    .take(10)
    .show

解决方法

您需要预先知道要从Kafka使用哪个分区的偏移量。如果您有该信息,则可以执行以下操作:

// Subscribe to multiple topics,specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers","192.168.1.1:9092,...")
  .option("subscribe","myTopic")
  .option("startingOffsets","""{"myTopic":{"0":20,"1":20}}""")
  .option("endingOffsets","""{"myTopic":{"0":25,"1":25}}""")
  .load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
  .as[(String,String)]
  .filter(...)

Kafka + Spark Integration Guide中有关于startingOffsetsendingOffsets的更多详细信息

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...