Spark之RDD理解

RDD理解

首先在理解RDD之前,我们要知道Spark的运行流程,大致分为Standalone模式和跑在其它调度器上,如yarn和Mesos,而最常见的就是跑在yarn上,跑在yarn上还分为Client和Cluster两种模式。
区别在于:
Client模式的Driver跑在了当前本地机器上而不是集群上,当本地机器与集群机器所在地距离远时会受到网速影响较大,输出结果和运行日志直接输出到控制台上,开发中适合做调试用
Cluster模式Driver跑在集群上,当本地机器与集群机器所在地距离远时会受到网速影响较小,开发中适合用于生产环境
这里以yarn模式的Cluster Master提交方法为例

在这里插入图片描述

这是提交一个Spark任务的主要流程,而RDD及存在于Executor中

在这里插入图片描述

这里直接的表达了wordcount案列中每个RDD起到的作用,所以其实可以将RDD看作一个一个流水线上的加工点。
让我们来看下RDD比较官方的解释:
RDD(Resilient distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
其实就可以将他看作流水线上的每个加工点,RDD它是不储存数据的,数据像水流一样从RDD经过,只做加工。而每个RDD的算子其实就是这个RDD的运行逻辑,在流水线思想中,可以将算子视为这个加工点做的工作是什么。我们通过不同的RDD和RDD中的算子逻辑可以完成对数据的一系列逻辑运算
这里要注意一个问题是,RDD再创建好后不会直接运算,也就是流水线即使有了,也需要一个开始,Executor来说,行动算子就是RDD流水线开始的号角,这个后续讲算子再详细解释,反正要明白,前面的转换算子的RDD是不会执行的,直到遇到行动算子如collect()之后才会开始RDD流水线

RDD 的创建

在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
从其它RDD创建很好理解,就是其它RDD的算子返回了一个RDD
(1)那么从集合创建RDD又是什么呢,其实是Spark主要提供了两种函数:parallelize和makeRDD,后者是较好使用的一种,它的底层其实也是parallelize,只不过还加入了添加位置信息的功能
所以常用的是makeRDD,当我们获得了sc对象后就可以直接通过makeRDD创建RDD

在这里插入图片描述

这里可以添加Array,range,等Seq下的数组

在这里插入图片描述

这样就创建好了RDD
(2)从外部数据创建RDD,这个也很好理解,比如WordCount就是读取了一个文件,从而创建了RDD

在这里插入图片描述

RDD分区规则

RDD (从集合中创建)的分区规则

在这里插入图片描述


追源码

在这里插入图片描述


可以看到,在Seq参数后,还有一个Int格式的分区数,只不过这个分区数给了认值defaultParallelism,所以可以不写,而后面的withScope则是scala中的高阶函数的控制抽象,也就是在这函数外面又包装了一段代码,具体效果是我们再通过4040端口查看Spark任务时的图形化显示,没有具体逻辑意义。
再上上图中,我们并没有给numSlices赋值,所以使用它的认值defaultParallelism,这里点进去看

在这里插入图片描述


再往下点

在这里插入图片描述


到了一个抽象类中,我们通过ctrl+h来查找它的实现类,并查找重写后的这个方法

在这里插入图片描述


再点进去发现还是抽象类

在这里插入图片描述


找到它的实现类发现有两种格式,上面的是集群模式,下面的是本地模式
首先点开集群模式看一下

在这里插入图片描述


可见如果集群中没有配置参数spark.default.parallelism的话,则比较totalCoreCount和2的最大值,作为分区数
然后再点开本地模式的实现类查看方法

在这里插入图片描述


可以看见比较参数spark.default.parallelism和cpu线程数
结论:从集合创建rdd,如果不手动写分区数量的情况下,认分区数跟本地模式的cpu线程数有关,如我的cpu是4核4线程,则会生成4个分区文件
//local : 1个 local[*] : 笔记本所有核心数 local[K]:K个

以上是不写分区数,接下来是直接写分区数的方法

在这里插入图片描述


这是为什么呢
这里介绍下它的分区规则,源码中点到最后是这样的

在这里插入图片描述


所以我们可以对5个数据,三个分区输出结果进行分析

在这里插入图片描述

RDD (从文件中读取创建)的分区规则

(1)不给定分区数

在这里插入图片描述

(2)而如果给定分区数目的话,这里以这个文件做例子

在这里插入图片描述


写定分区数为3,但是出现4个分区文件,并且第一个分区文件为1,2;第二份分区文件为3,第三个分区文件为4,第四个分区文件为空
1.分区数量的计算方式:
totalSize = 10 (这份文件为10个字节)
goalSize = 10 / 3 (分区数)= 3(byte) 表示每个分区存储3字节的数据
分区数= totalSize/ goalSize = 10 /3 => 3,3,4
4子节大于3子节的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即一共有4个分区 3,3,3,1
2. Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系
3.数据读取位置计算是以偏移量为单位来进行计算的。
4.数据分区的偏移量范围的计算 (这里的两个@分别代表回车换行)
0 => [0,3] 1@@ 012 0 => 1,2
1 => [3,6] 2@@ 345 1 => 3
2 => [6,9] 3@@ 678 2 => 4
3 => [9,10] 4 9 3 => 无

那么我们再写一例加深理解,如图所示文件

在这里插入图片描述


一共是39字节,也可以数出来,并且给定分区数为4

在这里插入图片描述


结果为

在这里插入图片描述


生成了5个分区文件,并且只有第一,二,四有,三五为空
还是通过上面说的方法计算
totalSize = 39 (这份文件为39个字节)
goalSize = 39 / 4 (分区数)= 9.75(byte) 表示每个分区存储3字节的数据
因为源码中是用long修饰goalSize的所以自动转为9,那么分区数自然加一分区并且每个分区安排读的字节为9 9 9 9 3

在这里插入图片描述


右边的数为每行结束时,字节数目,分别为15,28,39.而它用的又是hadoop的filetextinput方法文件,每次读一行,也就是说第一个分区应该读【0,9】,但是会将第一行hello atguigu读完,当第二个分区读【9,18】因为读到16时就到第二行了,而每次又要读一整行,所以第二个分区将第二行数据hello spark全部读走。当第三个分区读【18,27】时,由于27并没有到第三行,而第二行又被读过一次了,所以第三个分区为空。当第四个分区读【27,36】时,到达29时就将第三行数据hello spark读走,同理,到达第五个分区时自然没东西可以读

相关文章

1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展A...
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,...