问题描述
有一个输入数据框df
(有10列,col1-col10),我在其中使用下面的UDF添加新列uuid
并转换为另一个数据框newdf
。
接下来,根据newdf
数据名人堂,我将创建两个单独的数据帧df1
(uuid,col1-col5)和df2
(uuid,col6-col10),其中仅提及这些列。 / p>
问题在这里出现,我希望uuid
和df1
数据框中的行的df2
列应该相同且唯一。
由于Spark使用惰性评估,因此在我写df1
和df2
时它将运行UDF,因为它为uuid
中的每一行提供不同的df1
值和df2
数据帧。
解决方案是,我首先在临时路径上写入newdf
数据帧,然后将其读回。但是,这种逻辑对大量数据不利。
下面是一个代码段:
df.show(false)
+------+------+------+------+------+------+------+------+------+-------+
| col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 |
+------+------+------+------+------+------+------+------+------+-------+
| A1 | A2 | A3 | A4 | A5 | A6 | A7 | A8 | A9 | A10 |
| B1 | B2 | B3 | B4 | B5 | B6 | B7 | B8 | B9 | B10 |
| C1 | C2 | C3 | C4 | C5 | C6 | C7 | C8 | C9 | C10 |
+------+------+------+------+------+-------+------+------+------+------+
val uuid = udf(() => java.util.UUID.randomUUID().toString)
val newdf = df.withColumn("uuid",uuid())
val df1 = newdf.select(uuid,col1,col2,col3,col4,col5)
val df2 = newdf.select(uuid,col6,col7,col8,col9,col10)
df1.write.format("parquet").save("/df1/")
df2.write.format("parquet").save("/df2/")
df1.show()
+-----------------+------+------+------+------+------+
| uuid | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1 | A2 | A3 | A4 | A5 |
|1dbcecf-1304-4a4e| B1 | B2 | B3 | B4 | B5 |
|1vbdecf-8406-4a4e| C1 | C2 | C3 | C4 | C5 |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
| uuid | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|2aodecf-3303-6n5e| A6 | A7 | A8 | A9 | A10 |
|2docecf-6305-6n5e| B6 | B7 | B8 | B9 | B10 |
|2vodecf-1406-6n5e| C6 | C7 | C8 | C9 | C10 |
+-----------------+------+------+------+------+------+
预期的输出:在跨行的两个数据框中都使用相同的 uuid
df1.show()
+-----------------+------+------+------+------+------+
| uuid | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1 | A2 | A3 | A4 | A5 |
|1dbcecf-1304-4a4e| B1 | B2 | B3 | B4 | B5 |
|1vbdecf-8406-4a4e| C1 | C2 | C3 | C4 | C5 |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
| uuid | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A6 | A7 | A8 | A9 | A10 |
|1dbcecf-1304-4a4e| B6 | B7 | B8 | B9 | B10 |
|1vbdecf-8406-4a4e| C6 | C7 | C8 | C9 | C10 |
+-----------------+------+------+------+------+------+
请提供克服此问题的最佳方法。
解决方法
试试这个-
解决方案在Spark Scala API中
使用UUID.nameUUIDFromBytes
作为udf
加载提供的测试数据
df.show(false)
df.printSchema()
/**
* +----+----+----+----+----+----+----+----+----+-----+
* |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|
* +----+----+----+----+----+----+----+----+----+-----+
* |A1 |A2 |A3 |A4 |A5 |A6 |A7 |A8 |A9 |A10 |
* |B1 |B2 |B3 |B4 |B5 |B6 |B7 |B8 |B9 |B10 |
* |C1 |C2 |C3 |C4 |C5 |C6 |C7 |C8 |C9 |C10 |
* +----+----+----+----+----+----+----+----+----+-----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: string (nullable = true)
* |-- col3: string (nullable = true)
* |-- col4: string (nullable = true)
* |-- col5: string (nullable = true)
* |-- col6: string (nullable = true)
* |-- col7: string (nullable = true)
* |-- col8: string (nullable = true)
* |-- col9: string (nullable = true)
* |-- col10: string (nullable = true)
*/
创建相同的UUID
val uuid = udf((s: String) => UUID.nameUUIDFromBytes(s.getBytes(StandardCharsets.UTF_8)).toString)
val newdf = df.withColumn("uuid",uuid(concat_ws(":",df.columns.map(col): _*)))
val df1 = newdf.select("uuid","col1","col2","col3","col4","col5")
val df2 = newdf.select("uuid","col6","col7","col8","col9","col10")
df1.show(false)
/**
* +------------------------------------+----+----+----+----+----+
* |uuid |col1|col2|col3|col4|col5|
* +------------------------------------+----+----+----+----+----+
* |0c26ece0-708a-3105-896f-e70d18b67766|A1 |A2 |A3 |A4 |A5 |
* |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B1 |B2 |B3 |B4 |B5 |
* |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C1 |C2 |C3 |C4 |C5 |
* +------------------------------------+----+----+----+----+----+
*/
df2.show(false)
/**
* +------------------------------------+----+----+----+----+-----+
* |uuid |col6|col7|col8|col9|col10|
* +------------------------------------+----+----+----+----+-----+
* |0c26ece0-708a-3105-896f-e70d18b67766|A6 |A7 |A8 |A9 |A10 |
* |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B6 |B7 |B8 |B9 |B10 |
* |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C6 |C7 |C8 |C9 |C10 |
* +------------------------------------+----+----+----+----+-----+
*/