MapReduce
1.MapReduce概述
MapReduce是Hadoop数据分析的核心,是一个分布式的编程框架。
2.MapReduce的优缺点
优点
- MapReduce适合PB级以上的海量数据的运算。
- 扩展新高:如果当前集群计算资源有限,可以通过扩展集群的方式提高集群性能。
- 高容错性:如果一台正在运行MR程序的服务器挂了,Hadoop会自动的将该节点上的MR程序转移到正常运行的服务器完成运行。
- MR程序编程简单,只需要实现简单的接口即可。
缺点
- MapReduce不擅长计算流式数据,因为流式数据都是动态的,而Hadoop设计的初衷是运算静态数据。
- 不擅长运算有向无环数据(Directed Acyclic Graph,简称DAG),也可以称为迭代式运算。因为DAG要求将上一次运算的输出结果当作下一次的运算的输入结果。MR程序一次只能存在一个Map阶段和Reduce阶段,如果完成DAG只能编写多个MR程序串行计算,效率极低。
- 不擅长实时计算,比如MysqL那种毫秒级的计算。
3.实现MR程序
MR的核心功能就是将用户的业务需求和MR自带的组件整合成为一个分布式的MR程序。最后将MR程序投入到Hadoop中进行并发运行。
1.Hadoop中的数据类型
Hadoop中定义了自己的一套数据类型体系,与Java数据类型的对照如下:
Java Hadoop
long LongWritable
int IntWritable
String Text
boolean BooleanWritable
null NullWritable
... ...
2.序列化和反序列化
Java中的序列化机制十分完善,但是封装好的序列化信息除了需要的数据之外,还包括其他的信息,这些信息在做大数据处理时并不需要,所以Hadoop又实现了一套自己的序列化和反序列化机制(Writable接口
)。
-
序列化:
为了将内存中的对象持久性的保存到磁盘中,或是传到网络上传输,需要将内存中的对象实现序列化,转换成字节流放在磁盘中保存或网络传输。
-
反序列化:
反序列化就是将磁盘中的字节信息或网络传输的字节流还原成内存中的对象。
1.实现序列化的方式
Hadoop中设计了自己的序列化接口Writable
,实现自定义序列化类时,只需要实现这个接口并重写接口中的序列化和反序列化方法即可。
void write(DataOutput out) throws IOException; //序列化--输出流
void readFields(DataOutput in) throws IOException; //反序列化--输出流
2.Hadoop序列化的特点
- 结构紧凑,没有多余的信息,高效的使用存储空间
- 读写速度快,传输效率高
- 支持多语言平台操作
3.自定义MR程序
4.MR框架
1.MR的核心思想
将输入数据切片,切片后的数据输入到Mapper阶段执行运算,再将Mapper阶段的结果当作Reducer阶段的输入数据,最后输出最终结果。
1.切片
切片是按照逻辑分片的理念,将数据块保存的数据按照一定大小在内存中切片,并没有真正的将数据块再次切分。切片大小默认与数据块的大小(BlockSize)相同(128MB),但是可以根据服务器的实际运算性能调整。
2. MR的执行阶段
- 一个MR进程至少分成2个阶段:MapTask(MT)、ReduceTask(RT)。
- 切片的数量 = MT的数量,即切成几个数据片就有几个MT。
- RT的数量 = 业务需求,即:如果业务要求将数据分成两部分,那么就会有两个RT。
- MR程序一次只能存在一个Map阶段和Reduce。如果业务逻辑复杂,就需要编写多个MR程序穿行执行,比如实现DAG。
2.MR进程包括那些
MR进程包括三个部分:
-
MrAppMaster
:负责整个MR进程的协调和监管。 -
MapTask
:负责Map阶段的数据处理。 -
ReduceTask
:负责Reduce阶段的数据处理,并输出最后接口。
3.MR程序运行流程
简单分析:
InputFormat —> Mapper阶段 —> Reducer阶段 —> OutFormat
详细过程:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LlK54GqW-1661263561462)(E:\笔记\hadoop\图片\image-20220815134313523.png)]
MR程序可以分为两个阶段
:
- 提交Job,对当前Job需要的MT、RT进行初始化
- 执行Job,开始并发执行MT,之后执行RT
4.InputFormat组件
InputFormat在Map阶段前执行,负责数据的读入和切片。InputFormat是一个抽象类,具体的切片工作由他的默认实现类FileInputFormat
完成。
FileInputFormat
中声明了==getSplit()==方法完成数据切片。切片与MT关系如下图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SevhzkKp-1661263561463)(E:\笔记\hadoop\图片\image-20220811195610296.png)]
切片机制:
- 按照数据内容长度进行切片,并且是有序切片,从头到尾
- 切片时并不是按照数据的整体进行切分,而是按照单独的文件进行切分
- 切片大小默认是数据块大小,可以根据计算性能调整
- 对零散文件、小文件切分时有专门的解决策略---->
CombineTextInputFormat
切片数量决定了MT人物的数量
切片源码分析
切片是操作发生在Map阶段之前,数据输入之后,由此可以确定切片操作一定发生在作业提交阶段。
从提交作业的程序跟进,最终发现切片操作被声明在InputFormat类中,具体的切片实现方式在InputFormat的子类FileInputFormat中。
切片操作逻辑分析
:
- 首先判断是否忽略数据包中的迭代的文件夹,默认情况下忽略,如果没有忽略会将文件夹下的数据取出进行切片,忽略情况下该文件夹下的数据会被视为一个整体。
- 其次获取文件的信息和数据长度。判断整个数据包是否能够被切片,部分压缩算法的数据不支持切片操作。如果能够被切再进行切片,如果不支持切片,则将整个数据分配个1个MT任务完成。
- 如果支持切片,再判断切片的大小,默认情况下切片大小等于数据块大小。为的是避免跨服务器操作数据。
- 最后开始进行切片,如果剩余数据量 / 切片大小 > 1.1,则继续将剩余数据切片,否则不切片,将剩余数据分配个一个MT。
小文件切片逻辑分析
:
HDFS对小文件十分敏感,如果要处理的数据是小文件,则按照小文件的切片方式进行切片----CombineTextInputFormat,是FileInputFormat的子类。
- 首先获取小文件信息,设置预设切片阈值,之后进行逻辑分块。
- 如果个小文件的大小 < 阈值,则将这个文件分为一个块。
- 如果文件大小 > 阈值,但是小于2倍的阈值,则将此文件平均分为两个块。
- 如果文件大小 > 2倍的阈值,则将此文件按照阈值大小先分为一块,剩余数据按照步骤3继续判断。
- 最后将所有分块的数据合并,如果逻辑块 >= 阈值,则分为一块,如果不满足则与下一个块合并,并且再次与阈值比较,直到符合要求。
5.shuffle工作流程
shuffle发生在map阶段之后,reduce阶段之前。此过程是将MT处理过的数据重新进行整合,然后输出到reduce阶段
。
工作流程
:
- shuffle过程中创建了一个环形的数据缓冲区,数据从MT读取到环形缓冲区,环形区一边写入真实数据,另一边写入数据的分区号等信息。
- 当缓冲区中读取的数据达到==80%==后,环形区会将数据按照分区将数据溢写到磁盘,并且分区内进行排序。写出数据的同时,缓冲区也会反向读入数据。
- 数据分区写出后,可选是否执行combiner对数据进行分区归并、是否压缩输出数据。
- 之后将数据按照分区合并、排序,之后按照相同的key分组,输出到reduce阶段。
由此可以看出:
6.Join应用
1.Reduce Join
在Reduce阶段,将不同表的数据按照关联字段进行连接,并且做好数据来源的区分。
实现Reduce Join:
- 在Map阶段读使用==setup()==方法获取数据,并对数据进行相应的读流、数据切割等初始化操作。
- 在map方法中对数据进行读取、标记数据来源等操作,最后将数据封装输出。
- reduce阶段按照不同数据来源的数据将结果输出。
2.Map Join
如果在Reduce阶段处理的表过多,容易造成数据倾斜。可以通过在Map阶段时,向内存中缓存多张小表
,提前处理一部分逻辑业务,这样既可以减小Reduce阶段的工作压力,也可以尽量避免数据倾斜。
注意
:Map Join只适合存在小表的业务逻辑,Hive中对小表的定义是25MB,Hadoop中需要靠经验分析来确定小表。
实现Reduce Join:
7.数据压缩
压缩使用在MR阶段提高运算速度的一种方式。
1.压缩的原则
- 运算密集型的Job,少用压缩
- IO密集型的Job,多用压缩
2.压缩的特点
压缩可以减少IO空间,节省磁盘容量。但是压缩会增加cpu开销。
3.MR的压缩格式
压缩格式 | Hadoop自带? | 算法 | 文件扩展名 | 是否可切片 | 换成压缩格式后,原来的程序是否需要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 是,直接使用 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
性能比较:
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
- | -------- | -------- |
| gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
| bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
| LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |