Delta Lake 测试案例

Delta Lake 测试案例

参考:官网调研传送门

本篇我将写几个测试用例,来测试其功能,我这里使用sbt去创建项目,数据落盘到我本地电脑

一、创建项目:

组件版本
sbt1.4.2
scala2.12.10
Spark3.0.0
DeltaLake0.7.0

build.sbt文件

name := "DaltaLake"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "io.delta" %% "delta-core" % "0.7.0"
libraryDependencies += "log4j" % "log4j" % "1.2.17"

二、测试案例

1、手动创建数据

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object SparkDataInot {
 def main(args: Array[String]): Unit = {
 val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Test"))

val spark = SparkSession.builder().getorCreate()
 val rdd = sc.makeRDD(Seq(1, 2, 3, 4))

import spark.implicits._

val df = rdd.toDF("id")

df.write.format("delta").save("/Users/smzdm/Documents/data/0")
// df.write.format("delta").save("DeltaTable") 当然,本地测试的话,这么写也是OK的,存储位置是项目目录下

df.show()
 sc.stop()
 spark.stop()
 }
}
-----------------------------------
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+

目录下00000000000000000000.json文件内容

{"commitInfo":{"timestamp":1611823662796,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"5","numOutputBytes":"2007","numOutputRows":"4"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"MetaData":{"id":"0b3ba27f-6a52-48e4-b9c4-6a1c3818a25c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"Metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1611823659077}}
{"add":{"path":"part-00000-cd0851fa-df9c-48c9-b94e-24ad9bfa2b33-c000.snappy.parquet","partitionValues":{},"size":299,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00001-a098f9cb-ceaa-4ef7-aa5e-10ba6f90fe7f-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00003-f62f5dac-6fe7-4cee-bf43-b1488996db0e-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00005-4de1c7f7-d399-483b-bc7f-77e7b21dd017-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00007-17f127f0-f378-4a74-9b92-97ac65f57c23-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}

以下每次操作写表会生成一个write的json文件,我这里就不重复写了

{"commitInfo":{"timestamp":1611828204517,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":3,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"886","numOutputRows":"4"}}}
{"add":{"path":"part-00000-cb4cdcc9-5e71-4f45-ae38-01c1a526e3c4-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611828204000,"dataChange":true}}
{"remove":{"path":"part-00000-9374a554-221f-4fc6-8197-486cc61d9dc5-c000.snappy.parquet","deletionTimestamp":1611828204512,"dataChange":true}}

2、测试Delta表更新Schema(其实Spark本身可以实现更新Schema)

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

/**
 * 测试案例:新增schema
 */
object DeltaSchema {
 def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().master("local[*]").getorCreate()

val df = spark.read.format("delta").load("/Users/smzdm/Documents/data/0")

val df1 = df.withColumn("rn",row_number()over(Window.orderBy("id")))
 .withColumn("flag",when(col("rn")===lit(1),lit(1)).otherwise(lit(0)))

df1.show()

df1.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
 spark.stop()
 }
}
-----------------------------------
+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 2| 2| 0|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+

目录下00000000000000000001.json文件内容

{"commitInfo":{"timestamp":1611824104223,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"886","numOutputRows":"4"}}}
{"MetaData":{"id":"0b3ba27f-6a52-48e4-b9c4-6a1c3818a25c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"Metadata\":{}},{\"name\":\"rn\",\"type\":\"integer\",\"nullable\":true,\"Metadata\":{}},{\"name\":\"flag\",\"type\":\"integer\",\"nullable\":true,\"Metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1611823659077}}
{"add":{"path":"part-00000-5f96937a-30aa-4e23-af03-f0679367ca7c-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611824103000,"dataChange":true}}
{"remove":{"path":"part-00001-a098f9cb-ceaa-4ef7-aa5e-10ba6f90fe7f-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00003-f62f5dac-6fe7-4cee-bf43-b1488996db0e-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00000-cd0851fa-df9c-48c9-b94e-24ad9bfa2b33-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00007-17f127f0-f378-4a74-9b92-97ac65f57c23-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00005-4de1c7f7-d399-483b-bc7f-77e7b21dd017-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}

3、更新数据

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import io.delta.tables._

/**
 * 测试案例:更新update
 */
object DeltaUpdate {
 def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
 .appName("update")
 .master("local[*]").getorCreate()


import spark.implicits._
 //读取成表
 val ddf = DeltaTable.forPath(spark,"/Users/smzdm/Documents/data/0")


// update(where条件,Map(更新字段->(更新逻辑)))
 ddf.update(col("id")>2,Map("id"->(col("id")+10)))
 
 val df = ddf.toDF

df.show()

df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
 spark.stop()
 }
}
-----------------------------------
+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 2| 2| 0|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+

目录下00000000000000000002.json文件内容

{"commitInfo":{"timestamp":1611828239222,"operation":"UPDATE","operationParameters":{"predicate":"(id#395 > 2)"},"readVersion":4,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numAddedFiles":"1","numUpdatedRows":"2","numcopiedRows":"2"}}}
{"remove":{"path":"part-00000-cb4cdcc9-5e71-4f45-ae38-01c1a526e3c4-c000.snappy.parquet","deletionTimestamp":1611828238847,"dataChange":true}}
{"add":{"path":"part-00000-cf10bc40-bd4b-4a41-9897-2fc0fe2cfe09-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611828239000,"dataChange":true}}

4、删除数据

import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, lit}

object DeltaDelete {
 def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
 .appName("delete")
 .master("local[*]").getorCreate()


 //读取成表
 val ddf = DeltaTable.forPath(spark,"/Users/smzdm/Documents/data/0")


 ddf.delete(col("id")===2)

val df = ddf.toDF

df.show()

df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
 spark.stop()
 }
}

-----------------------------------

+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+

目录下00000000000000000003.json文件内容

{"commitInfo":{"timestamp":1611828364666,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` = 2)\"]"},"readVersion":6,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numDeletedRows":"1","numAddedFiles":"1","numcopiedRows":"3"}}}
{"remove":{"path":"part-00000-3067922b-8702-49df-a903-aa778462d95b-c000.snappy.parquet","deletionTimestamp":1611828364655,"dataChange":true}}
{"add":{"path":"part-00000-e2f90118-2525-4614-8015-42c2c2545018-c000.snappy.parquet","partitionValues":{},"size":876,"modificationTime":1611828364000,"dataChange":true}}

5、数据历史版本

import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

/**
 * 查看历史版本
 * 回溯某历史版本数据
 * 清除历史版本
 */
object DeltaDataVersion {
  def main(args: Array[String]): Unit = {

    //输入回溯版本
 val versionCode = 1
 // val versionCode = args(0).trim

 val spark = SparkSession.builder()
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
 .appName("delete")
      .master("local[*]").getorCreate()


    val df = DeltaTable.forPath(spark, "/Users/smzdm/Documents/data/0")

    //查看历史
 df.history().toDF().show(false)

    //可选择版本号(versionAsOf)或者时间戳(timestampAsOf)进行数据回溯
 val dff = spark.read.format("delta").option("versionAsOf", versionCode).load("/Users/smzdm/Documents/data/0")
    val dff = spark.read.format("delta").option("timestampAsOf", "2021-01-28 18:03:59").load("/Users/smzdm/Documents/data/0")

    //清理100小时历史数据,不填写认保留7天数据,清理7天前数据
     df.vacuum(100)

    dff.toDF.show()

    spark.stop()

  }
}

6、测试双表merge

关于merge这一块测试时间较长,出现的情况特别多,数据可能不为准确。
前一篇文章提到过自动模式在这一篇也进行测试。另外需要注意如下:
1、笛卡尔积情况会报错。必须提前对数据进行处理。
2、自动模式根据需求选择开启。
3、merge表之后,执行execute()函数进行触发。不需要重新写回表。

import io.delta.tables._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
 * 多表进行merge操作
 * merge.update:配合whenMatched(过滤条件)使用,只更新匹配到的数据
 * merge.insert:配合whenMatched(过滤条件)使用,插入新数据
 *
 * 组合:merge.update.insert 实现有则更新,无则新增
 *
 * 测试:
 * update:成功
 * insert:成功
 * 笛卡尔积:报错,需要提前对key进行预处理
 * 两表不同schema,进行merge。自动模式和认模式测试
 *
 */

object DeltaMerge {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
      .config("spark.databricks.delta.schema.autoMerge.enabled",true) //开启自动模式演变,可选
      .appName("merge")
      .master("local[*]")
      .getorCreate()

    val ddf = DeltaTable.forPath(spark, "DeltaTable").toDF.withColumn("rn", lit(1)).withColumn("flag", lit(100))
    val ddf1 = DeltaTable.forPath("/Users/smzdm/Documents/data/0")

    DeltaTable.forName("")

    val df = ddf.withColumn("rn",col("rn")+2)
      .withColumn("A",lit("a"))
      .withColumn("B",lit("B"))

    print("=============t====================")
    ddf.toDF.show()
    print("----------------t1-----------------")
    ddf1.toDF.show()
    print("==============df===================")
    df.show()

    //merge.update更新数据,配合whenMatched(过滤条件)使用,Map(需要更新字段->更新来源字段)
    ddf1.as("t1")
      .merge(ddf.as("t"), "t.id=t1.id")
      .whenMatched("t.id=4") //
      .updateExpr(Map("t1.rn" -> "t.rn")) //只更新匹配到ID的
      //.updateall()  //更新所有数据
      .execute()

    //merge.insert插入数据,配合whenNotMatched(过滤条件)使用,Map(需要更新字段->更新来源字段)
    ddf1.as("t1")
      .merge(ddf.as("t"), "t.id=t1.id")
      .whenNotMatched("t.id=4") //加条件
      .insertExpr(Map("t1.rn" -> "t.rn")) //ID有则更新,无则新增
      //        .whenNotMatched()
      //      .insertAll() //插入所有数据
      .execute()

    //测试笛卡尔积,报错。根据官网提示,需要提前对key进行预处理,多key会导致无法识别更新哪一个
    ddf1.as("t1")
      .merge(df.as("t"),"t.id=t1.id")
      .whenMatched()
      .updateall()
      .whenNotMatched()
      .insertAll()
      .execute()

    ddf1.toDF.show()

    spark.stop()
  }
}

7、流批同表测试

测试中。。。

总结

持续更新中。。。

相关文章

1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展A...
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,...