问题描述
与我同住,我是新手。我正在尝试从齐柏林飞艇的笔记本中读取kafka流,但未返回任何数据。但是,当我尝试从命令行读取主题时,它实际上确实会返回数据。
C:\kafka_2.13-2.6.0>bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
这是我的代码:
val sourcetopic = "quickstart-events"
val targetTopic = "sensor-processed"
val kafkaBootstrapServer = "127.0.0.1:9092"
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder.appName("Simple Application")
.config("spark.master","local").getorCreate()
val rawData = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaBootstrapServer)
.option("subscribe",sourcetopic)
.option("startingOffsets","earliest")
.load()
case class SensorData(id: String,ts: Long,value: Double)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[SensorData].schema
val rawValues = rawData.selectExpr("CAST(value AS STRING)").as[String]
val visualizationQuery = rawValues.writeStream
.queryName("visualization")
.outputMode("append")
.format("memory")
.start()
val sampleDataset = sparkSession.sql("select * from visualization")
sampleDataset.count
当应该有两个事件时,计数返回0。
我的依赖项
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.7
org.scala-lang:scala-library:2.11.0
org.apache.spark:spark-core_2.11:2.4.7
org.apache.spark:spark-sql_2.11:2.4.7
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)