解决:IDEA中import kafka.serializer.StringDecoder导入报红的问题

解决:IDEA中import kafka.serializer.StringDecoder导入报红的问题

kafka的版本是2.1.1,spark-streaming-kafka-0-8_2.11

在SparkStreaming整合Kafka时,采用direct方法。在手动导入kafka.serializer.StringDecoder时,一直报红。

解决方法一:手动在代码头部添加

import _root_.kafka.serializer.StringDecoder

解决方法二:暂时删除之前创建的java->spark.kafka文件夹,因为会有冲突,删除后未报错或者将spark.kafka改个名字也可以。。。

代码如下:

package spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import _root_.kafka.serializer.StringDecoder

/**
  * Spark Streaming对接Kafka的方式
  */
object KafkaDirectWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2) {
      System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
      System.exit(1)
    }

    val Array(brokers, topics) = args

    val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
      .setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String,String]("Metadata.broker.list"-> brokers)

    // Todo... Spark Streaming如何对接Kafka
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
    ssc,kafkaParams,topicsSet
    )

    // Todo... 自己去测试为什么要取第二个
    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

相关文章

1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展A...
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,...