大数据高级开发工程师——Spark学习笔记5

Spark内存计算框架

Spark Core

Spark读写HBase

  • 我们可以通过 Spark 整合 HBase,实现通过 Spark 来读取 HBase 的数据。
  • 数据准备:创建 HBase 表,并插入数据:
create 'spark_hbase','info'
put 'spark_hbase','0001','info:name','tangseng'
put 'spark_hbase','info:age','30'
put 'spark_hbase','info:sex','0'
put 'spark_hbase','info:addr','beijing'
put 'spark_hbase','0002','sunwukong'
put 'spark_hbase','508'
put 'spark_hbase','shanghai'
put 'spark_hbase','0003','zhubajie'
put 'spark_hbase','715'
put 'spark_hbase','shenzhen'
put 'spark_hbase','0004','bailongma'
put 'spark_hbase','1256'
put 'spark_hbase','donghai'
put 'spark_hbase','0005','shaheshang'
put 'spark_hbase','1008'
put 'spark_hbase','tiangong'

在这里插入图片描述

  • 创建输出 HBase 数据
create 'spark_hbase_out','info'

1. 通过newAPIHadoopRDD实现

  • 添加 pom.xml 依赖
<repositories>
  <!-- spark on hbase是cloudera提供的,所以这个地方添加了cdh仓库地址 -->
  <repository>
    <id>cloudera</id>
    <!--      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>-->
    <url>https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark</url>
  </repository>
</repositories>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.3</version>
</dependency>

<!-- hadoop-core 2009、7月更名为hadoop-common -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
<!--      <version>2.6.0-mr1-cdh5.14.2</version>-->
  <version>3.1.4</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-server</artifactId>
<!--      <version>1.2.0-cdh5.14.2</version>-->
  <version>2.2.6</version>
</dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.hbase</groupId>-->
<!--      <artifactId>hbase-common</artifactId>-->
<!--      <version>1.2.0-cdh5.14.2</version>-->
<!--&lt;!&ndash;      <version>2.2.6</version>&ndash;&gt;-->
<!--    </dependency>-->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
<!--      <version>1.2.0-cdh5.14.2</version>-->
  <version>2.2.6</version>
</dependency>
<!-- 添加此依赖,解决spark hbase集成,低版本时报Base64找不到的问题 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-mapreduce</artifactId>
  <version>2.2.6</version>
</dependency>
<!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-spark</artifactId>
<!--      <version>1.2.0-cdh5.14.2</version>-->
  <version>2.1.0-cdh6.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.3.3</version>
</dependency>
  • 代码实现:
object Case09_SparkWithHBase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 1. 创建HBase的环境参数
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "spark_hbase")

//    // 2. 设置过滤器,还可以设置起始和结束rowkey
//    val scan = new Scan
//    scan.setFilter(new RandomRowFilter(0.5f))
//    // 设置scan对象,让filter生效(序列化)
//    hbaseConf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

    // 3. 读取HBase数据,生成RDD
    val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])

    resultRDD.foreach(x => {
      // 查询出来的结果集存在 (ImmutableBytesWritable,Result)第二个元素
      val result = x._2
      // 获取行键
      val rowKey = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
      val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
      println(rowKey + ":" + name + ":" + age)
    })

    // 4. 向HBase表写入数据
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "spark_hbase_out")
    val job = Job.getInstance(hbaseConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    // 5. 封装输出结果 resultRDD: RDD[(ImmutableBytesWritable,Result)]
    val outRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.mapPartitions(eachPartition => {
      eachPartition.map(keyAndEachResult => {
        val result = keyAndEachResult._2
        val rowKey = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
        val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))

        val put = new Put(Bytes.toBytes(rowKey))
        val immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))
        // 向 HBase 插入数据,需要 rowKey 和 put 对象
        (immutableBytesWritable, put)
      })
    })

    // 6. 调用API Output the RDD to any Hadoop-supported storage system with new Hadoop API
    outRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()
  }
}

在这里插入图片描述

2. 通过spark on hbase实现

  • 参考资料:
    • https://github.com/cloudera-labs/SparkOnHBase/blob/cdh5-0.0.2/src/main/scala/com/cloudera/spark/hbase/HBaseContext.scala
    • https://issues.apache.org/jira/browse/HBASE-13992
    • https://github.com/cloudera-labs/SparkOnHBase
  • 优势:
    • 无缝的使用 HBase Connection
    • 和 Kerberos 无缝集成
    • 通过 get 或 scan 直接生成 RDD
    • 利用 RDD 支持 HBase 的任何组合操作
    • 为通用操作提供简单的方法,同时通过 API 允许不受限制的未知高级操作
    • 支持 Java 和 Scala
    • 为 Spark 和 Spark Streaming 提供相似的 API
  • 由于 hbaseContext 是一个只依赖 hadoop、hbase、spark 的 jar 包的工具类,因此可以拿过来直接用
  • 添加依赖包:
<!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-spark</artifactId>
  <version>2.1.0-cdh6.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.3.3</version>
</dependency>
  • 代码实现:
object Case10_SparkOnHBase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val sc = new SparkContext(conf)

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "spark_hbase")

    val hbaseContext = new HBaseContext(sc, hbaseConf)
    val scan = new Scan()

    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = hbaseContext.hbaseRDD(TableName.valueOf("spark_hbase"), scan)

    hbaseRDD.map(eachResult => {

      val result: Result = eachResult._2
      val rowKey = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
      println(rowKey + ":" + name + ":" + age)
    }).foreach(println)
    sc.stop()
  }
}

Spark的序列化与反序列化

1. transformation操作为什么需要序列化

  • Spark 是分布式执行引擎,其核心抽象是弹性分布式数据集 RDD,其代表了分布在不同节点的数据。
  • Spark 的计算是在 Executor 上分布式执行的,故用户开发的关于 RDD 的 map、flatMap、reduceByKey 等 transformation 操作(闭包)有如下执行过程:
    • ① 代码中对象在 Driver 本地序列化;
    • ② 对象序列化后传输到远程 Executor 节点;
    • ③ 远程 Executor 节点反序列化对象;
    • ④ 最终远程节点执行。
  • 故对象在执行中需要序列化通过网络传输,则必须经过序列化过程。

2. spark的任务序列化异常

  • 在编写 spark 程序中,由于在 map、foreachPartition 等算子内部使用了外部定义的变量和函数,从而引发 Task 未序列化问题。
  • 然而 spark 算子在计算过程中使用外部变量在许多情形下确实在所难免:
    • 比如在 filter 算子根据外部指定的条件进行过滤;
    • map根据相应的配置进行变换。
  • 经常会出现“org.apache.spark.SparkException: Task not serializable”这个错误
    • 其原因就在于这些算子使用了外部的变量,但是这个变量不能序列化。
    • 当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现 Task 未序列化问题。

在这里插入图片描述

在这里插入图片描述

3. spark中解决序列化的方法

  • 如果函数中使用了该类对象,该类要实现序列化:类 extends Serializable
  • 如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化。
  • 对于不能序列化的成员变量使用==“@transient”==标注,告诉编译器不需要序列化。
  • 也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。
  • 可以把对象的创建直接在该函数中构建这样避免需要序列化。

在这里插入图片描述

在这里插入图片描述

4. kyro序列化

  • 在分布式应用中,经常会进行IO操作,传递对象,而网络传输过程中就必须要序列化。
  • Java序列化可以序列化任何类,比较灵活,但是相当慢,并且序列化后对象的提交也比较大。
  • Spark 出于性能考虑,在 2.0 以后,开始支持 kryo 序列化机制,速度是 Serializable 的 10 倍以上,当 RDD 在 Shuffle 数据的时候,简单数据类型,简单数据类型数组,字符串类型已经使用 kryo 来序列化。
  • 也可以通过 kyro 对我们需要序列化的对象,进行序列化标价
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
  • 举个例子:
case class MySearcher(val query: String) {
  def getMatchRddByQuery(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }
}

object Case11_Kyro {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      // 替换默认序列化机制
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      // 注册需要使用kryo序列化自定义类
      .registerKryoClasses(Array(classOf[MySearcher]))
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hadoop yarn", "hadoop hdfs", "c"))
    val rdd2: RDD[String] = MySearcher("hadoop").getMatchRddByQuery(rdd1)
    rdd2.foreach(println)
  }
}

数据倾斜原理和现象分析

1. 数据倾斜概述

  • 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时 Spark 作业的性能会比期望差很多。
  • 数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证 Spark 作业的性能。

2. 数据倾斜发生时的现象

  • ① 绝大多数task执行得都非常快,但个别 task 执行极慢
    • 你的大部分的 task,都执行的特别快,很快就执行完了,剩下几个 task,执行的特别特别慢;
    • 前面的 task,一般 10s 可以执行完5个,最后发现某个task,要执行 1 个小时、2 个小时才能执行完一个 task;
    • 这个时候就出现数据倾斜了。这种方式还算好的,因为虽然老牛拉破车一样,非常慢,但是至少还能跑。
  • ② 绝大数 task 执行很快,有的 task 直接报OOM (Jvm Out Of Memory) 异常
    • 运行的时候,其他 task 都很快执行完了,也没什么特别的问题;
    • 但是有的 task,就是会突然间报了一个 OOM,内存溢出了,task failed、task lost、resubmitting task等日志异常信息。
    • 反复执行几次某个 task 就是跑不通,最后就挂掉。
    • 某个 task 就直接 OOM,那么基本上也是因为数据倾斜了,task 分配的数量实在是太大了!!!所以内存放不下,然后你的 task 每处理一条数据,还要创建大量的对象。内存爆掉了。

3. 数据倾斜发生的原理

在这里插入图片描述

  • 如上图所示:在进行任务计算 shuffle 操作的时候,第一个 task 和第二个 task 各分配到了 1 万条数据;需要 5 分钟计算完毕;第三个 task要 98万 条数据,98 * 5 = 490分钟 = 8个小时;
  • 本来另外两个 task 很快就运行完毕了(5分钟),第三个task数据量比较大,要 8 个小时才能运行完,就导致整个 spark 作业,也得 8 个小时才能运行完。最终导致整个 spark 任务计算特别慢。

4. 数据倾斜如何定位原因

  • 主要是根据log日志信息去定位:
    • 数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
    • 出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。因为某个或者某些 key 对应的数据,远远的高于其他的key。
  • 分析定位逻辑:
    • 由于代码中有大量的 shuffle 操作,一个 job 会划分成很多个 stage
    • 首先要看的,就是数据倾斜发生在第几个 stage 中。
    • 可以在任务运行的过程中,观察任务的 UI 界面,可以观察到每一个 stage 中运行的 task 的数据量,从而进一步确定是不是 task 分配的数据不均匀导致了数据倾斜。
    • 比如下图中,倒数第三列显示了每个 task 的运行时间。明显可以看到,有的 task 运行特别快,只需要几秒钟就可以运行完;而有的 task 运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。
    • 此外,倒数第一列显示了每个 task 处理的数据量,明显可以看到,运行时间特别短的 task 只需要处理几百 KB 的数据即可,而运行时间特别长的 task 需要处理几千 KB 的数据,处理的数据量差了 10 倍。此时更加能够确定是发生了数据倾斜。

在这里插入图片描述

  • 某个task莫名其妙内存溢出的情况
    • 这种情况下去定位出问题的代码就比较容易了。
    • 建议直接看 yarn-client 模式下本地 log 的异常栈,或者是通过 YARN 查看 yarn-cluster 模式下的 log 中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有 shuffle 类算子,此时很可能就是这个算子导致了数据倾斜。
    • 但是需要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的 bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过 Spark Web UI 查看报错的那个 stage 的各个 task 的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。
  • 查看导致数据倾斜的key的数据分布情况
    • 知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了 shuffle 操作并且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。
    • 这主要是为之后选择哪一种技术方案提供依据。针对不同的 key 分布与不同的 shuffle 算子组合起来的各种情况,可能需要选择不同的技术方案来解决。
    • 此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:
      • ① 如果是 Spark SQL 中的 group by、join 语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的 key 分布情况。
      • ② 如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 Spark 作业中加入查看 key 分布的代码,比如RDD.countByKey()。
    • 然后对统计出来的各个key出现的次数,collect/take 到客户端打印一下,就可以看到 key 的分布情况。
    • 举例来说,对于上面所说的单词计数程序,如果确定了是 stage1 的 reduceByKey 算子导致了数据倾斜,那么就应该看看进行reduceByKey 操作的 RDD 中的 key 分布情况,在这个例子中指的就是 pairs RDD。
    • 如下示例,我们可以先对 pairs 采样 10% 的样本数据,然后使用 countByKey 算子统计出每个 key 出现的次数,最后在客户端遍历和打印样本数据中各个 key的出现次数。
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

5. 数据倾斜原因总结

  • 数据本身问题
    • ① key 本身分布不均衡(包括大量的key为空)
    • ② key 的设置不合理
  • spark使用不当的问题
    • ① shuffle 时的并发度不够
    • ② 计算方式有误

6. 数据倾斜的后果

  • spark 中的 stage 的执行时间受限于最后那个执行完成的 task,因此运行缓慢的任务会拖垮整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。
  • 过多的数据在同一个task中运行,将会把 executor 内存撑爆,导致 OOM 内存溢出。

spark中数据倾斜的解决方案

1. 解决方案一:使用Hive ETL预处理数据

  • 适用场景:导致数据倾斜的是 Hive 表。如果该 Hive 表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用 Spark 对 Hive 表执行某个分析操作,那么比较适合使用这种技术方案。
  • 实现思路:此时可以评估一下,是否可以通过 Hive 来进行数据预处理(即通过 Hive ETL 预先对数据按照 key 进行聚合,或者是预先和其他表进行 join),然后在 Spark 作业中针对的数据源就不是原来的 Hive 表了,而是预处理后的 Hive 表。此时由于数据已经预先进行过聚合或 join 操作了,那么在 Spark 作业中也就不需要使用原先的 shuffle 类算子执行这类操作了。
  • 实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在 Spark 中执行 shuffle 类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以 Hive ETL 中进行 group by 或者 join 等 shuffle 操作时,还是会出现数据倾斜,导致 Hive ETL 的速度很慢。我们只是把数据倾斜的发生提前到了 Hive ETL 中,避免 Spark 程序发生数据倾斜而已。
  • 优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark 作业的性能会大幅度提升。
  • 缺点:治标不治本,Hive ETL 中还是会发生数据倾斜。
  • 实践经验:在一些 Java 系统与 Spark 结合使用的项目中,会出现 Java 代码频繁调用 Spark 作业的场景,而且对 Spark 作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的 Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次 Java 调用 Spark 作业时,执行速度都会很快,能够提供更好的用户体验。
  • 项目经验:有一个交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过 Java Web 系统提交数据分析统计任务,后端通过 Java 提交 Spark 作业进行数据分析统计。要求 Spark 作业速度必须要快,尽量在 10 分钟以内,否则速度太慢,用户体验会很差。所以我们将有些 Spark 作业的 shuffle 操作提前到了 Hive ETL 中,从而让 Spark 直接使用预处理的 Hive 中间表,尽可能地减少 Spark 的 shuffle 操作,大幅度提升了性能,将部分作业的性能提升了 6 倍以上。

在这里插入图片描述

2. 解决方案二:过滤少数导致倾斜的key

  • 适用场景:如果发现导致倾斜的 key 就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如 99% 的 key 就对应 10 条数据,但是只有一个 key 对应了 100 万数据,从而导致了数据倾斜。
  • 实现思路:如果我们判断那少数几个数据量特别多的 key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个 key。
    • 比如,在 Spark SQL 中可以使用 where 子句过滤掉这些 key 或者在 Spark Core 中对 RDD 执行 filter 算子过滤掉这些 key。
    • 如果需要每次作业执行时,动态判定哪些 key 的数据量最多然后再进行过滤,那么可以使用 sample 算子对 RDD 进行采样,然后计算出每个 key 的数量,取数据量最多的 key 过滤掉即可。
  • 实现原理:将导致数据倾斜的 key 给过滤掉之后,这些 key 就不会参与计算了,自然不可能产生数据倾斜。
  • 优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。
  • 缺点:适用场景不多,大多数情况下,导致倾斜的 key 还是很多的,并不是只有少数几个。
  • 实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天 Spark 作业在运行的时候突然 OOM 了,追查之后发现,是 Hive 表中的某一个 key 在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个 key 之后,直接在程序中将那些 key 给过滤掉。

3. 解决方案三:提高shuffle操作的并行度(效果差)

  • 适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。
  • 实现思路:在对 RDD 执行 shuffle 算子时,给 shuffle 算子传入一个参数,比如 reduceByKey(_ + _,1000),该参数就设置了这个 shuffle 算子执行时 shuffle read task 的数量。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。
  • 实现原理:增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。举例来说,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后,每个 task 就分配到一个key,即每个 task 就处理10条数据,那么自然每个 task 的执行时间都会变短了。具体原理如下图所示。
  • 优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。
  • 缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
  • 实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

在这里插入图片描述

4. 解决方案四:两阶段聚合(局部聚合+全局聚合)

  • 适用场景:对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 Spark SQL 中使用 group by语句 进行分组聚合时,比较适用这种方案。
  • 实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个 key 都打上一个随机数,比如 10 以内的随机数,此时原先一样的 key 就变成不一样的了,比如 (hello,1) (hello,1),就会变成(1_hello,1) (1_hello,1) (2_hello,1)。接着对打上随机数后的数据,执行 reduceByKey 等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello,2) (2_hello,2)。然后将各个 key 的前缀给去掉,就会变成 (hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如 (hello,4)。
  • 实现原理:将原本相同的 key 通过附加随机前缀的方式,变成多个不同的 key,就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合,进而解决单个 task 处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果具体原理见下图。
  • 优点:对于聚合类的 shuffle 操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将 Spark 作业的性能提升数倍以上。
  • 缺点:仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案。

在这里插入图片描述

  • 案例:如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作流程如下:
// 1. 将原始的 key 转化为  随机值 + key  (随机值 = Random.nextInt)
// 2. 对数据进行 reduceByKey(func)
// 3. 将随机值+key 转成 key
// 4. 再对数据进行 reduceByKey(func),如果使用 groupBykey 会增大数据倾斜的概率

object WordCountAggTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
    val sc = new SparkContext(conf)
    val array = Array("you you", "you you",
      "you you",
      "jump jump")
    val rdd = sc.parallelize(array, 8)
    rdd.flatMap(line => line.split(" "))
      .map(word => {
        val prefix = (new util.Random).nextInt(3)
        (prefix + "_" + word, 1)
      }).reduceByKey(_ + _)
      .map(wc => {
        val newWord = wc._1.split("_")(1)
        val count = wc._2
        (newWord, count)
      }).reduceByKey(_ + _)
      .foreach(wc => {
        println("单词:" + wc._1 + " 次数:" + wc._2)
      })
  }
}

5. 解决方案五:将reduce join转为map join

  • 适用场景:在对 RDD 使用 join 类操作,或者是在 Spark SQL 中使用 join 语句时,而且 join 操作中的一个 RDD 或表的数据量比较小(比如几百M或者一两G),比较适用此方案。
  • 实现思路:不使用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。
  • 实现原理:普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据 + map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。具体原理如下图所示。
  • 优点:对 join 操作导致的数据倾斜,效果非常好,因为根本就不会发生 shuffle,也就根本不会发生数据倾斜。
  • 缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver 和每个 Executor 内存中都会驻留一份小 RDD 的全量数据。如果我们广播出去的 RDD 数据比较大,比如 10G 以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

在这里插入图片描述


在这里插入图片描述

6. 解决方案六:采样倾斜key并分拆join操作

  • 适用场景:两个 RDD/Hive 表进行 join 的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个 RDD/Hive 表中的 key 分布情况。如果出现数据倾斜,是因为其中某一个 RDD/Hive 表中的少数几个 key 的数据量过大,而另一个 RDD/Hive 表中的所有 key 都分布比较均匀,那么采用这个解决方案是比较合适的。
  • 实现思路
    • ① 对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出来数据量最大的是哪几个 key。
    • ② 然后将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀,而不会导致倾斜的大部分 key 形成另外一个 RDD。
    • ③ 接着将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀,不会导致倾斜的大部分 key 也形成另外一个 RDD。
    • ④ 再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join,此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了。
    • ⑤ 而另外两个普通的 RDD 就照常 join 即可。
    • ⑥ 最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果。
  • 实现原理:对于 join 导致的数据倾斜,如果只是某几个 key 导致了倾斜,可以将少数几个 key 分拆成独立 RDD,并附加随机前缀打散成 n 份去进行 join,此时这几个 key 对应的数据就不会集中在少数几个 task 上,而是分散到多个 task 进行 join 了。
  • 优点:对于 join 导致的数据倾斜,如果只是某几个 key 导致了倾斜,采用该方式可以用最有效的方式打散 key 进行 join。而且只需要针对少数倾斜 key 对应的数据进行扩容 n 倍,不需要对全量数据进行扩容。避免了占用过多内存。
  • 缺点:如果导致倾斜的 key 特别多的话,比如成千上万个 key 都导致数据倾斜,那么这种方式也不适合。

在这里插入图片描述

7. 解决方案七:使用随机前缀和扩容RDD进行join

  • 适用场景:如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,那么进行分拆 key 也没什么意义,此时就只能使用这一种方案来解决问题了。
  • 实现思路
    • ① 该方案的实现思路基本和“解决方案六”类似,首先查看 RDD/Hive 表中的数据分布情况,找到那个造成数据倾斜的 RDD/Hive 表,比如有多个 key 都对应了超过 1 万条数据。
    • ② 然后将该 RDD 的每条数据都打上一个 n 以内的随机前缀。
    • ③ 同时对另外一个正常的 RDD 进行扩容,将每条数据都扩容成 n 条数据,扩容出来的每条数据都依次打上一个 0~n 的前缀。
    • ④ 最后将两个处理后的 RDD 进行 join 即可。
  • 实现原理:将原先一样的 key 通过附加随机前缀变成不一样的 key,然后就可以将这些处理后的“不同key”分散到多个 task 中去处理,而不是让一个 task 处理大量的相同 key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜 key 对应的数据进行特殊处理,由于处理过程需要扩容 RDD,因此上一种方案扩容 RDD 后对内存的占用并不大;而这一种方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,因此只能对整个 RDD 进行数据扩容,对内存资源要求很高。
  • 优点:对 join 类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
  • 缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。
  • 实践经验:曾经开发一个数据需求的时候,发现一个 join 导致了数据倾斜。优化之前,作业的执行时间大约是 60 分钟左右;使用该方案优化之后,执行时间缩短到 10 分钟左右,性能提升了 6 倍。

相关文章

文章浏览阅读301次。你可以使用Thrift客户端来测试HBase Thr...
文章浏览阅读565次。hive和hbase数据迁移_hive转hbase
文章浏览阅读707次。基于单机版安装HBase,前置条件为Hadoop...
文章浏览阅读301次。linux集群搭建-HBase_linux中在/home目录...
文章浏览阅读933次。中没有库的概念,说一个数据说的是哪一个...
文章浏览阅读1k次,点赞16次,收藏21次。整理和梳理日常hbas...