问题描述
该主题有超过 100000 条消息。当我运行我的程序时,spark Streaming 只在第一个 RDD 中有数据,后面的 RDD 什么都没有,这让我很困惑。
另外,我发现即使我手动更改偏移量,流式传输也总是在第一个 RDD 打印相同的消息。
感谢任何建议或帮助!!!
代码如下:
object Main
{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("XXX")
.setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(3))
ssc.sparkContext.setLogLevel("ERROR")
val topics = List("rd_cyg")
val offSet = RedisUtil.getoffsetInfo(topics(0))
val kafkaParams: Map[String,Object] = Map(
"bootstrap.servers" -> "XXX","group.id" -> "XXX","key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer","value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer","serializer.class" -> "kafka.serializer.StringEncoder","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream(
ssc,locationStrategy = LocationStrategies.PreferConsistent,consumerStrategy = ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,offSet)
)
stream.map(_.value())
.print(2)
ssc.start()
ssc.awaitTermination()
ssc.stop(true,true)
}
}
具体偏移位置为
Map(XXX-0 -> 223,XXX-2 -> 224,XXX-1 -> 223)
控制台输出是
-------------------------------------------
Time: 1623136371000 ms
-------------------------------------------
first-message
second-message
...
-------------------------------------------
Time: 1623136374000 ms
-------------------------------------------
-------------------------------------------
Time: 1623136377000 ms
-------------------------------------------
部分敏感信息被XXX替换。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)