流数据
大数据的两种存在形式:静态和动态
静态大数据:已经积累产生并存在那里的大数据
动态大数据:随着时间的推移不断的产生的大数据
各种摄像头的监控数据
12306的订票请求
银行的交易请求
Storm
最早是由Nathan Marz和他的团队于2010年在数据分析公司BackType开发
2011年BackType公司被Twitter收购,接着Twitter开源Storm
2014年成为Apache顶级项目
Storm被业界称为实时版的Hadoop,它与Hadoop、Spark并称为Apache基金会三大顶级的开源项目,是当前流计算技术中的佼佼者和主流
它将数据流中的数据以元组的形式不断的发送给集群中的不同节点进行分布式处理,能够实现真正的实时处理
Spark streaming
Spark Streaming是Spark软件栈中的一个用于流计算的组件
在2014年发布的Spark1.0版本中,Spark Streaming已经包含在Spark软件栈中
它基于Spark的核心批处理计算框架,通过将数据流沿时间轴分成不同的片段,然后交由Spark对不同片段的数据进行批处理来实现流式计算
所以,从严格意义上来说,Spark Streaming实现的并不是流式计算,具有一定的时间延迟,无法做到毫秒级的响应
但是由于Spark处理速度快,Spark Streaming也能够胜任和满足许多场景下的流计算需求
Spark streaming的wordCount案例
以Socket为数据源
socket通常翻译为套接字
socket封装了网络中计算机的IP地址与端口
serverIP:serverPort, 比如 localhost:9999
监控端口,以端口的数据为数据源
以文件目录为数据源
监控文件目录
以文件目录中新增的文件为数据源
pom
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.liu</groupId>
<artifactId>socketSparkStreaming</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- 对spark core的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<!-- 对spark streaming的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
</project>
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SocketWordCount {
def main(args :Array[String]) = {
//创建一个streamingContext对象,在本地运行,两个线程
//设置划分数据流为片段的时间间隔为20秒
val sc = new StreamingContext("local[2]", "socketWordCount", Seconds(20) )
// 创建一个数据流对象,连接到serverIP:serverPort, 比如 localhost:9999
val lines = sc.socketTextStream("localhost", 9999)
//将输入数据流中的每一行以空格为分隔符分割为单词
val words = lines.flatMap(line=>line.split(" "))
//统计一个时间片内的单词个数
val wordCounts = words.map(word => (word, 1)).reduceByKey((a,b)=>a+b)
//将每个时间片中的前10个单词打印到控制台
wordCounts.print()
//输出到本地以wordcount为前缀文件名的文件中
wordCounts.saveAsTextFiles("wordcount")
//启动JobScheduler,开始执行应用
sc.start()
sc.awaitTermination()
}
}