Spark 2.1.2 Streaming + Kafka 1.1.0 -- 在IDEA中通过Maven创建Spark项目

一、开发环境中需要安装和配置如下
安装JDK,配置JDK环境变量(jdk1.8)
安装Scala,配置JDK环境变量(scala2.11.8)
最好安装一个Maven,虽然Idea已经集成自带的有Maven
测试环境中已经安装有Zookeeper集群,Kafka需要用到(3.4.5)
测试环境中已经安装有Kafka集群(1.1.0)
测试环境中已经安装有Spark集群(2.1.2)

二、创建Spark项目
1. 打开Idea
2.配置Maven
如果是Window系统 依次打开 File –> Settings –> Build,Execution,Deployment –> Build Tools –> Maven ;如果是 Mac 系统 IntelliJ IDEA –> Preferences –> Build,Execution,Deployment –> Build Tools

选定Mavne的安装目录(也可以用IDEA自带的,那就不用修改此路径)

勾选Override,后修改Maven本地仓库所在的位置

 


修改Maven设置文件添加本地仓库位置的设置,同时添加国内镜像,Maven设置文件setting.xml认会加载主目录下~/.m2/setting.xml的文件,所以以防设置没有生效而下载的依赖包又保存到C盘了,最好此目录下也放置一份,在<settings>标签添加<localRepository>Maven_repo路径</localRepository>,然后再在<settings>标签内的<mirrors>中添加阿里的国内镜像

<!-- 配置阿里云的镜像仓库 -->
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>

3.安装Scala插件
我们做Spark基本以Scala开发为主,因此先在Idea中安装一下Scala插件

依次打开 File –> Settings –> Plugins –> browse Repositories… 搜索Scala,安装即可

 

4.创建一个Maven项目
一次点击 File –> New –> Project…

左侧选中Maven,右侧选择我们项目的JDK版本,以及把Create from archetype勾上,

在下面找到 org.apache.camel.archetypes:camel-archetype-scala,点击Next

填写GroupID(这个一般是公司域名反写)和ArtifactId(项目名或者模块名)

 

5.修改项目Scala环境
快捷键 Ctrl + Shift + Alt + S

选中左侧的Global Libraries 点击 + 将我们本地的Scala加载到项目环境中,也可以点击工具中的Download… 按钮选择对应的版本下载

 

6.修改pom文件
pom文件添加如下依赖和配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.yore.spark</groupId>
<artifactId>spark-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>spark-demo with integration Kafka</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.2</spark.version>
<!--<spark.version>2.2.1</spark.version>-->
<kafka.version>1.1.0</kafka.version>
<commons-lang.version>2.6</commons-lang.version>
</properties>


<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>


<dependencies>
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId>
<version>1.0.6</version>
</dependency>

<!-- spark -->
<!-- spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming-flume -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Spark中的RPC是使用Akka实现的, -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.4</version>
</dependency>


<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>

<!-- commons-lang -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>

<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.11.0</version>
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.0</version>
<!--<scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
<!--<scope>runtime</scope>-->
</dependency>

<!-- specs -->
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.4.3</version>
<scope>test</scope>
</dependency>

<!-- testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<!--<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>-->
<plugins>
<!-- the Maven compiler plugin will compile Java source files -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<!--<source>1.8</source>-->
<!--<target>1.8</target>-->
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.3</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.yore.spark.kafka.SparkKafkaDemo</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- the Maven Scala plugin will compile Scala source files -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<!--surefire plugin,avoid messy code when mvn test console -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<configuration>
<skipTests>true</skipTests>
<forkMode>once</forkMode>
<argLine>-Dfile.encoding=UTF-8</argLine>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>${basedir}/src/main/scala</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
</resource>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>

</project>
7.增加项目配置文件
在Spark 的Maven项目里的resources目录下新建一个my.properties文件,配置如下内容

# kafka configs
kafka.bootstrap.servers=cdh6:6667,cdh5:6667,cdh4:6667
kafka.topic.source=spark-kafka-demo
kafka.topic.sink=spark-sink-test
kafka.group.id=spark_demo_gid1
8.新建包
在scala下创建com.yore.spark包

在上面包下分别新建kafka包和utils包

在utils包下新建Scala Class ,Name: PropertiesUtil,Kind:Object

package com.yore.spark.utils

import java.util.Properties

/**
* Properties的工具类
*
* Created by yore on 2017-11-9 14:05
*/
object PropertiesUtil {

/**
*
* 获取配置文件Properties对象
*
* @author yore
* @return java.util.Properties
*/
def getProperties() :Properties = {
val properties = new Properties()
//读取源码中resource文件夹下的my.properties配置文件
val reader = getClass.getResourceAsstream("/my.properties")
properties.load(reader)
properties
}

/**
*
* 获取配置文件中key对应的字符串值
*
* @author yore
* @return java.util.Properties
*/
def getPropString(key : String) : String = {
getProperties().getProperty(key)
}

/**
*
* 获取配置文件中key对应的整数值
*
* @author yore
* @return java.util.Properties
*/
def getPropInt(key : String) : Int = {
getProperties().getProperty(key).toInt
}

/**
*
* 获取配置文件中key对应的布尔值
*
* @author yore
* @return java.util.Properties
*/
def getPropBoolean(key : String) : Boolean = {
getProperties().getProperty(key).toBoolean
}

}
在kafka包下新建Scala Class ,Name: KafkaSink,Kind:Object和Name: SparkKafkaDemo,Kind:Object

KafkaSink.scala

package com.yore.spark.kafka

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

/**
* 手动实现一个KafkaSink类,将数据发送到Kafka<br/>
* 在构造时传入高阶函数,获得一个生产者<br/>
*
* 同时创建一个对应的伴生对象,定义apply方法,这样使用时不用new
*
* This is the key idea that allows us to work around running into NotSerializableExceptions.
*
* Created by yore on 2017-12-14 9:40
*/
class KafkaSink[K,V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
lazy val producer = createProducer()

/** 发送消息 */
def send(topic : String, key : K, value : V) : Future[RecordMetadata] =
producer.send(new ProducerRecord[K,V](topic,key,value))
def send(topic : String, value : V) : Future[RecordMetadata] =
producer.send(new ProducerRecord[K,V](topic,value))
}


object KafkaSink {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

SparkKafkaDemo.scala

package com.sinosig.spark.kafka

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import com.sinosig.spark.utils.PropertiesUtil
import org.apache.commons.lang3.StringUtils
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.broadcast
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

/**
* SparkStreaming
* kafka --> Spark --> Kafka
*
* Created by yore on 2018-06-29 9:44
*/
object SparkKafkaDemo extends App{
// default a Logger Object
val LOG = org.slf4j.LoggerFactory.getLogger(SparkKafkaDemo.getClass)

/*if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}*/
// 设置日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)

val Array(brokers, topics , outTopic) = /*args*/ Array(
PropertiesUtil.getPropString("kafka.bootstrap.servers"),
PropertiesUtil.getPropString("kafka.topic.source") ,
PropertiesUtil.getPropString("kafka.topic.sink")
)


// Create context
/* 第一种方式 */
val sparkConf = new SparkConf().setMaster("local[2]")
sparkConf.setAppName("spark-kafka-demo1")
val ssc = new StreamingContext(sparkConf,Milliseconds(1000))

/* 第二种方式 */
/*val spark = SparkSession.builder()
.appName("spark-kafka-demo1")
.master("local[2]")
.getorCreate()
// 引入隐式转换方法,允许ScalaObject隐式转换为DataFrame
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext,Seconds(1))*/

// 设置检查点
ssc.checkpoint("spark_demo_cp1")

// Create direct Kafka Stream with brokers and Topics
// 注意:这个Topic最好是Array形式的,set形式的匹配不上
//var topicSet = topics.split(",")/*.toSet*/
val topicsArr = topics.split(",")

// set Kafka Properties
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> PropertiesUtil.getPropString("kafka.group.id"),
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

/**
* createStream是Spark和Kafka集成包0.8版本中的方法,它是将offset交给ZK来维护的
*
* 在0.10的集成包中使用的是createDirectStream,它是自己来维护offset,
* 速度上要比交给ZK维护要快很多,但是无法进行offset的监控。
* 这个方法只有3个参数,使用起来最为方便,但是每次启动的时候认从Latest offset开始读取,
* 或者设置参数auto.offset.reset="smallest"后将会从Earliest offset开始读取。
*
* 官方文档@see <a href="http://spark.apache.org/docs/2.1.2/streaming-kafka-0-10-integration.html">Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)</a>
*
*/
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsArr, kafkaParams)
)

/** Kafak sink */
// 广播KafkaSink
val kafkaProducer: broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", brokers)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
LOG.info("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}

var jsonObject = new JSONObject()
stream.filter(record =>{
// 过滤掉不符合要求的数据
try {
// println("$$$\t" + record.key + "\t" + record.value)
jsonObject = JSON.parSEObject(record.value)
}catch {
case e : Exception =>{
LOG.error("转换为JSON时发生了异常!\t{}",e.getMessage)
}
}
// 如果不为空字符时,为null,返回false过滤,否则为true通过
StringUtils.isNotEmpty(record.value) && null != jsonObject
}).map(record =>{
/** 这个地方可以写自己的业务逻辑代码,因为本次是测试,简单返回一个元组 */
jsonObject = JSON.parSEObject(record.value)
// 返出一个元组,(时间戳,json的数据日期,json的关系人姓名)
(System.currentTimeMillis(),
jsonObject.getString("date_dt"),
jsonObject.getString("relater_name")
)
}).foreachRDD(rdd =>{
if(!rdd.isEmpty()){
rdd.foreach(kafkaTuple =>{
//向Kafka发送数据
kafkaProducer.value.send(
outTopic,
kafkaTuple._1 + "\t"+ kafkaTuple._2 + "\t" + kafkaTuple._3
)
//打印到控制台
println(kafkaTuple._1 + "\t"+ kafkaTuple._2 + "\t" + kafkaTuple._3)
})
}
})

// 启动
ssc.start()
//等待关闭
ssc.awaitTermination()

}
三、启动测试
(1)远程访问Kafka节点,创建两个Topic

[root@cdh6 kafka]# ./bin/sh kafka-topics.sh --create --zookeeper cdh2:2181,cdh3:2181,cdh4:2181,cdh5:2181,cdh6:2181 --partitions 3 --replication-factor 1 –topic spark-kafka-demo
[root@cdh6 kafka]# ./bin/sh kafka-topics.sh --create --zookeeper cdh2:2181,cdh3:2181,cdh4:2181,cdh5:2181,cdh6:2181 --partitions 1 --replication-factor 1 –topic spark-sink-test
(2)运行SparkKafkaDemo

 

(3)在Kafka节点上,以控制台形式启动一个生产者

[root@cdh6 kafka]# ./bin/kafka-console-producer.sh --broker-list cdh6:6667,cdh5:6667,cdh4:6667 --topic spark-kafka-demo

(4)复制一条Json数据到生产者的控制台,格式如

{"date_dt": "1516095986393","relater_name": "yore"}
回车发送

 

(5)查看Idea控制台运行情况,是否报错,打印的信息

 

(6)在Kafka节点上,以控制台形式启动一个消费者,看结果是否已经推进来

[root@cdh6 kafka]# ./bin/ kafka-console-consumer.sh --bootstrap-server cdh6:6667,cdh5:6667,cdh4:6667 --from-beginning --topic spark-sink-test

 

四、打包
使用IDEA中的Maven项目的打包工具直接打包

在项目中右侧找到Maven Projects –> 项目名字(spark-demo with integration Kafka)–> package 点击开始打包。如果右侧找不到Maven Projects 可以点击一下Idea工具左下角的图标。
打包完成会在项目的根目录下target/目录下生成两个jar包
spark-demo-1.0-SNAPSHOT.jar
spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
分别是不带依赖的项目的源码编译的jar包,带所有依赖的jar

 

五、提交到集群
到target/将带依赖的jar包上传的Spark集群以standalone client 方式提交到集群运运行

[root@cdh4 spark2.1]# ./bin/spark-submit --master spark://cdh4:7077 --deploy-mode client \

--executor-memory 2g --total-executor-cores 2 --driver-memory 2G \

--class com.yore.spark.kafka.SparkKafkaDemo ~/yore/spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
在source的Topic中推送一条数据,用client模式可以在控制台查看打印的运行结果,同时查看落数据Topic中是否有数据


————————————————
版权声明:本文为CSDN博主「YoreYuan」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/github_39577257/article/details/81151137

相关文章

1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展A...
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,...