Apache Spark与Kafka的集成

问题描述

我正在学习有关Udemy的关于Kafka和Spark的课程,并且正在学习与Kafka的Apache Spark集成

下面是Apache Spark的代码

SparkSession session = SparkSession.builder().appName("KafkaConsumer").master("local[*]").getorCreate();
  session.sparkContext().setLogLevel("ERROR");
  Dataset<Row> df = session
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("subscribe","second_topic").load();
            
df.show();

下面是pom.xml文件内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.kafka.spark</groupId>
  <artifactId>Kafka-Spark-Integration-Code</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<!--    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> -->
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
   </dependency>
    
 </dependencies>
</project>

但是,当我运行代码时,出现以下错误,但无法解决。我在MX Linux上使用openjdk 8和spark 3。谢谢

exception in thread "main" java.lang.classFormatError: Invalid code attribute name index 24977 in class file org/apache/spark/sql/execution/columnar/InMemoryRelation
    at java.lang.classLoader.defineClass1(Native Method)
    at java.lang.classLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.urlclassloader.defineClass(urlclassloader.java:468)
    at java.net.urlclassloader.access$100(urlclassloader.java:74)
    at java.net.urlclassloader$1.run(urlclassloader.java:369)
    at java.net.urlclassloader$1.run(urlclassloader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.urlclassloader.findClass(urlclassloader.java:362)
    at java.lang.classLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.classLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:83)
    at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:132)
    at scala.Option.getorElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:132)
    at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:131)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:323)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:157)
    at scala.Option.getorElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
    at org.apache.spark.sql.streaming.DataStreamReader.<init>(DataStreamReader.scala:519)
    at org.apache.spark.sql.SparkSession.readStream(SparkSession.scala:657)
    at example.code.spark.kafka.KafkaSparkConsumer.main(KafkaSparkConsumer.java:19)

解决方法

您可以按照Structured Streaming + Kafka Integration Guide中给出的示例进行操作:

SparkSession session = SparkSession.builder()
  .appName("KafkaConsumer")
  .master("local[*]")
  .getOrCreate();

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe","second_topic")
  .load()
  .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)");

使用数据。 Structured Streaming Programming Guide向您展示如何将数据打印到控制台:

StreamingQuery query = df
  .writeStream()
  .format("console")
  .outputMode("append")
  .option("checkpointLocation","path/to/checkpoint/dir")
  .start();

query.awaitTermination();