无法使用Spark从Kafka读取数据

问题描述

与我同住,我是新手。我正在尝试从齐柏林飞艇的笔记本中读取kafka流,但未返回任何数据。但是,当我尝试从命令行读取主题时,它实际上确实会返回数据。

C:\kafka_2.13-2.6.0>bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 

This is my first event
This is my second event

这是我的代码

val sourcetopic = "quickstart-events" 
val targetTopic = "sensor-processed"
val kafkaBootstrapServer = "127.0.0.1:9092"

import org.apache.spark.sql.SparkSession

 val sparkSession = SparkSession.builder.appName("Simple Application")
    .config("spark.master","local").getorCreate()

val rawData = sparkSession.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers",kafkaBootstrapServer)
    .option("subscribe",sourcetopic)
    .option("startingOffsets","earliest")
    .load()

case class SensorData(id: String,ts: Long,value: Double)

import org.apache.spark.sql.Encoders
val schema = Encoders.product[SensorData].schema

val rawValues = rawData.selectExpr("CAST(value AS STRING)").as[String]

val visualizationQuery = rawValues.writeStream
    .queryName("visualization")
    .outputMode("append")
    .format("memory")
    .start()

val sampleDataset = sparkSession.sql("select * from visualization")

sampleDataset.count

当应该有两个事件时,计数返回0。

我的依赖项

org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.7    
org.scala-lang:scala-library:2.11.0 
org.apache.spark:spark-core_2.11:2.4.7  
org.apache.spark:spark-sql_2.11:2.4.7   

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...