问题描述
我需要订阅Kafka主题latest
偏移,阅读一些最新记录,打印并完成。如何在Spark中执行此操作?我想我可以做这样的事情
sqlContext
.read
.format("kafka")
.option("kafka.bootstrap.servers","192.168.1.1:9092,...")
.option("subscribe","myTopic")
.option("startingOffsets","latest")
.filter($"someField" === "someValue")
.take(10)
.show
解决方法
您需要预先知道要从Kafka使用哪个分区的偏移量。如果您有该信息,则可以执行以下操作:
// Subscribe to multiple topics,specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers","192.168.1.1:9092,...")
.option("subscribe","myTopic")
.option("startingOffsets","""{"myTopic":{"0":20,"1":20}}""")
.option("endingOffsets","""{"myTopic":{"0":25,"1":25}}""")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
.filter(...)
Kafka + Spark Integration Guide中有关于startingOffsets
和endingOffsets
的更多详细信息