*Spark+Kafka构建实时分析*
*Dashboard 项目*
*一:实验环境准备*
*预备知识*
Linux系统命令使用、了解如何安装Python库、安装kafka。
*训练技能*
熟悉Linux基本操作、Pycharm的安装、Spark安装,Kafka安装,PyCharm安装。
*任务清单*
\1. Spark安装(略)
\2. Kafka安装
\3. Python安装(略)
\4. Python依赖库
\5. PyCharm安装(略)
一、系统和软件的安装
一、项目环境搭建。
(一)、spark搭建
我之前已搭建完成,在终端打开如下
pyspark
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2Ui4UJVc-1620658414735)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps1.jpg)]
(二)数据转移到Ubuntu
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LuKTadav-1620658414736)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps3.jpg)]
(三)、kafka环境搭建
把下载的kafka安装包解压到自己的目标文件夹下面,然后在如下操作:
`****切换到kafka的目录下****
****bin/zookeeper-server-start.sh /home/thc/spark/kafka_2.11-2.4.0/config/zookeeper.properties****`
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PI6w29jX-1620658414739)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps4.jpg)]
*****启动kafka*
重新打卡一个新的终端,切换到kafka的目录下
***\*bin/kafka-server-start.sh config/server.properties\****
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cr6G1kRT-1620658414740)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps5.jpg)]
*以单节点的配置创建了一个叫dblab的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。********也是重新打开终端。*
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9AtQJA5g-1620658414742)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps6.jpg)]
在结果中查看到dblab这个topic存在
bin/kafka-topics.sh --list --zookeeper localhost:2181
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-k6ObQGkY-1620658414742)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps7.jpg)]
用producer生产点数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RMQGcwWf-1620658414743)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps8.jpg)]
使用consumer来接收数据,重新打开新终端接受数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bDedMI09-1620658414744)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps9.jpg)]
到这里,数据可以完全接收,表示卡夫卡已经搭建成功了。
二、数据处理和Python操作Kafka
1、先安装Python操作Kafka的代码库,之前的pycharm已经搭建好了,所以就直接用了,可以在项目的终端使用pip安装。
pip install kafka-python
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5bh9VHMT-1620658414745)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps10.jpg)]
2、接着可以写如下Python代码,文件名为producer.py
import csv
import time
from kafka import KafkaProducer
\# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
\# 打开数据文件
csvfile = open("./user_log.csv","r",encoding="utf-8")
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性别在每行日志代码的第9个元素
if gender == 'gender':
continue # 去除第一行表头
time.sleep(0.1) # 每隔0.1秒发送一行数据
\# 发送数据,topic为'sex'
producer.send('sex',line[9].encode('utf8'))
上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LtMnq2RE-1620658414745)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps11.jpg)]
3、Python操作Kafka
我们可以写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer(‘sex’)
for msg in consumer:
print((msg.value).decode(‘utf8’))
在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka,
\1. cd /home/thc/spark/kafka_2.11-2.4.0
\2. bin/zookeeper-server-start.sh config/zookeeper.properties &
\3. bin/kafka-server-start.sh config/server.properties
在Kafka开启之后,即可开启KafkaProducer和KafkaConsumer。开启方法如下:
可以在pycharm终端当中直接运行程序,也可以直接在pycharm当中运行。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-P9cH1PQn-1620658414746)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps12.jpg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-16MPQfAa-1620658414747)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps13.jpg)]
运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:
2
1
1
1
2
0
2
三:Spark Streaming实时处理数据
1、把spark-streaming-kafka-0-8_2.11-2.1.0.jar这个jar包复制到spark的jar包下面
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-635tinaL-1620658414747)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps14.jpg)]
2、然后在spark/jars目录下新建kafka目录,把 /kafka/libs下所有函数库复制到/spark/jars/kafka目录下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-idlBrjJJ-1620658414748)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps15.jpg)]
3、然后,修改 Spark 配置文件,把 Kafka 相关 jar 包的路径信息增加到 spark-env.sh命令如下
\1. cd spark/conf
\2. vim spark-env.sh
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Le9iVKE-1620658414749)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps16.jpg)]
执行上述步骤之后,Spark开发Kafka环境即已配置好,下面介绍如何编码实现。
4、建立pySpark项目
新建项目目录:
然后在目录下创建一个kafka_test.py文件:代码如下
`from* kafka *import* KafkaProducer****from*** pyspark.streaming *import* StreamingContext
****from**** pyspark.streaming.kafka *import* KafkaUtils
****from**** pyspark *import* SparkConf, SparkContext
****import**** json
****import**** sys`
`***\*def\**** KafkaWordCount(zkQuorum, group, topics, numThreads):` `spark_conf = SparkConf().setAppName(***\*"KafkaWordCount"\****)` `sc = SparkContext(conf=spark_conf)` `sc.setLogLevel(***\*"ERROR"\****)` `ssc = StreamingContext(sc, 1)` `ssc.checkpoint(***\*"."\****)` `**#** **这里表示把检查点文件写入分布式文件系统****HDFS****,所以要启动****HadooP**** ** **# ssc.checkpoint(".")**** ** topicAry = topics.split(***\*","\****) **#** **将****topic****转换为****hashmaP****形式,而****python****中字典就是一种****hashmaP**** ** topicMap = {} ***\*for\**** topic ***\*in\**** topicAry:` `topicMap[topic] = numThreads` `lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(***\*lambda\**** x : x[1])` `words = lines.flatMap(***\*lambda\**** x : x.split(***\*" "\****))` `wordcount = words.map(***\*lambda\**** x : (x, 1)).reduceByKeyAndWindow((***\*lambda\**** x,y : x+y), (***\*lambda\**** x,y : x-y), 1, 1, 1)` `wordcount.foreachRDD(***\*lambda\**** x : sendmsg(x))` `ssc.start()` `ssc.awaitTermination()`
`**#** **格式转化,将****[["1", 3], ["0", 4], ["2", 3]]****变为****[{'1': 3}, {'0': 4}, {'2': 3}]****,这样就不用修改第四个教程的代码了**** *****\*def\**** Get_dic(rdd_list):` `res = []` `***\*for\**** elm ***\*in\**** rdd_list:` `tmp = {elm[0]: elm[1]}` `res.append(tmp)` `***\*return\**** json.dumps(res)`
``
***\*def\**** sendmsg(rdd):
***\*if\**** rdd.count != 0:
msg = Get_dic(rdd.collect())
`# 实例化一个KafkaProducer示例,用于向Kafka投递消息**
** producer = KafkaProducer(bootstrap_servers=****‘localhost:9092’****)
producer.send(****“result”****, msg.encode(****‘utf8’**))
# 很重要,不然不会更新
** producer.flush()
*if* name == *‘main’*:# 输入的四个参数分别代表着
** # 1.zkQuorum为zookeeper地址
** # 2.group为消费者所在的组
** # 3.topics该消费者所消费的topics
** # 4.numThreads开启消费topic线程的个数
** *if* (len(sys.argv) < 5):print(*"Usage: KafkaWordCount "***)
exit(1)
zkQuorum = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3]
numThreads = int(sys.argv[4])
print(group, topics)
KafkaWordCount(zkQuorum, group, topics, numThreads)`
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YhfQyiRJ-1620658414750)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps17.jpg)]
如果代码当中出现这样子的报错,那么我们可以在/home/thc/spark/spark-2.3.3-bin-hadoop2.7/python/pyspark/streaming
找到kafka.py 复制到python里面去
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0ijzP5nc-1620658414751)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps18.jpg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QFdoPFZK-1620658414752)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps19.jpg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qy7H9Cnq-1620658414752)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps20.jpg)]
上述代码注释已经也很清楚了,下面在简要说明下:
\1. 首先按每秒的频率读取Kafka消息;
\2. 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
\3. 最后将上述结果封装成json发送给Kafka。
ssc.checkpoint(".")
这行代码表示把检查点文件写入分布式文件系统HDFS,所以一定要事先启动Hadoop。如果没有启动Hadoop,则后面运行时会出现“拒绝连接”的错误提示。如果你还没有启动Hadoop,则可以现在在Ubuntu终端中,使用如下Shell命令启动Hadoop:
\1. cd /home/spark/hadoop #这是hadoop的安装目录
\2. ./sbin/start-dfs.sh
另外,如果不想把检查点写入HDFS,而是直接把检查点写入本地磁盘文件(这样就不用启动Hadoop),则可以对ssc.checkpoint()方法中的文件路径进行指定,比如下面这个例子:
ssc.checkpoint(“file:///home/spark/spark/mycode/kafka/checkpoint”)
5、运行项目
编写好程序之后,接下来编写运行脚本,在/home/spark/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容:
/home/spark/spark/bin/spark-submit /home/spark/spark/mycode/kafka/kafka_test.py 127.0.0.1:2181 1 sex 1
其中最后四个为输入参数,含义如下
\1. 127.0.0.1:2181为Zookeeper地址
\2. 1 为consumer group标签
\3. sex为消费者接收的topic
\4. 1 为消费者线程数
最后在/home/thc/PycharmProjects/spark_test/Srraming/目录下,运行如下命令即可执行刚编写好的Spark Streaming程序
sh startup.sh
也可以直接把上面的步骤在相应的shell当中去执行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SwagIsdx-1620658414753)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps21.jpg)]
程序运行成功之后,下面通过步骤二的KafkaProducer和KafkaConsumer来检测程序。
下面开启之前编写的KafkaProducer投递消息,然后将KafkaConsumer中接收的topic改为result,验证是否能接收topic为result的消息,更改之后的KafkaConsumer为
from kafka import KafkaConsumer
consumer = KafkaConsumer(‘result’)
for msg in consumer:
print((msg.value).decode(‘utf8’))
在同时开启Spark Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EidCIejZ-1620658414754)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps22.jpg)]
到此为止,Spark Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。需要数据和源码的朋友可以私信我,刚开始写,做的不好的地方请多多指教,向大家学习。
from kafka import KafkaConsumer
consumer = KafkaConsumer(‘result’)
for msg in consumer:
print((msg.value).decode(‘utf8’))
在同时开启Spark Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出:
[外链图片转存中…(img-EidCIejZ-1620658414754)]
到此为止,Spark Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。需要数据和源码的朋友可以私信我,刚开始写,做的不好的地方请多多指教,向大家学习。