问题描述
我是 apace hudi 的新手,并尝试使用 spark shell 在我的 Hudi 表中写入我的数据框。 对于第一次打字,我没有创建任何表并以覆盖模式写入,所以我希望它会创建 hudi 表。我正在写下面的代码。
spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
//Initialize a Spark Session for Hudi
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.spark.sql.SparkSession
val spark1 = SparkSession.builder().appName("hudi-datalake").master("local[*]").config("spark.serializer","org.apache.spark.serializer.KryoSerializer").config("spark.sql.hive.convertmetastoreParquet","false").getorCreat ()
//Write to a Hudi Dataset
val inputDF = Seq(
("100","2015-01-01","2015-01-01T13:51:39.340396Z"),("101","2015-01-01T12:14:58.597216Z"),("102","2015-01-01T13:51:40.417052Z"),("103","2015-01-01T13:51:40.519832Z"),("104","2015-01-02","2015-01-01T12:15:00.512679Z"),("105","2015-01-01T13:51:42.248818Z")
).toDF("id","creation_date","last_update_time")
val hudioptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME -> "work.hudi_test",DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "copY_ON_WRITE",DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "work.hudi_test",DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
// Upsert Data
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date",lit("2014-01-01"))
updateDF.write.format("org.apache.hudi").options(hudioptions).mode(SaveMode.overwrite).saveAsTable("work.hudi_test")
while writing this write statement i m getting below error message.
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2
有人可以请指导我应该如何写这个声明。
解决方法
这是您在 pyspark 中的问题的工作示例:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
spark = (
SparkSession.builder.appName("Hudi_Data_Processing_Framework")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet","false")
.config(
"spark.jars.packages","org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.2"
)
.getOrCreate()
)
input_df = spark.createDataFrame(
[
("100","2015-01-01","2015-01-01T13:51:39.340396Z"),("101","2015-01-01T12:14:58.597216Z"),("102","2015-01-01T13:51:40.417052Z"),("103","2015-01-01T13:51:40.519832Z"),("104","2015-01-02","2015-01-01T12:15:00.512679Z"),("105","2015-01-01T13:51:42.248818Z"),],("id","creation_date","last_update_time"),)
hudi_options = {
# ---------------DATA SOURCE WRITE CONFIGS---------------#
"hoodie.table.name": "hudi_test","hoodie.datasource.write.recordkey.field": "id","hoodie.datasource.write.precombine.field": "last_update_time","hoodie.datasource.write.partitionpath.field": "creation_date","hoodie.datasource.write.hive_style_partitioning": "true","hoodie.upsert.shuffle.parallelism": 1,"hoodie.insert.shuffle.parallelism": 1,"hoodie.consistency.check.enabled": True,"hoodie.index.type": "BLOOM","hoodie.index.bloom.num_entries": 60000,"hoodie.index.bloom.fpp": 0.000000001,"hoodie.cleaner.commits.retained": 2,}
# INSERT
(
input_df.write.format("org.apache.hudi")
.options(**hudi_options)
.mode("append")
.save("/tmp/hudi_test")
)
# UPDATE
update_df = input_df.limit(1).withColumn("creation_date",lit("2014-01-01"))
(
update_df.write.format("org.apache.hudi")
.options(**hudi_options)
.mode("append")
.save("/tmp/hudi_test")
)
# REAL UPDATE
update_df = input_df.limit(1).withColumn("last_update_time",lit("2016-01-01T13:51:39.340396Z"))
(
update_df.write.format("org.apache.hudi")
.options(**hudi_options)
.mode("append")
.save("/tmp/hudi_test")
)
output_df = spark.read.format("org.apache.hudi").load(
"/tmp/hudi_test/*/*"
)
output_df.show()
注意: 您的更新操作实际上会创建一个新分区并执行插入操作,因为您正在修改分区列 (2015-01-01 -> 2014-01-01)。您可以在输出中看到这一点。
我提供了一个更新示例,它将 last_update_time 更新为 2016-01-01T13:51:39.340396Z,这实际上更新了分区 中的 id 100 >2015-01-01 从 2015-01-01T13:51:39.340396Z 到 2016-01-01T13:51:39.340396Z
中找到更多示例