如何在两个单独的数据框中保持相同的uuid?

问题描述

有一个输入数据框df(有10列,col1-col10),我在其中使用下面的UDF添加新列uuid并转换为另一个数据框newdf

接下来,根据newdf数据名人堂,我将创建两个单独的数据帧df1(uuid,col1-col5)和df2(uuid,col6-col10),其中仅提及这些列。 / p>

问题在这里出现,我希望uuiddf1数据框中的行的df2列应该相同且唯一。

由于Spark使用惰性评估,因此在我写df1df2时它将运行UDF,因为它为uuid中的每一行提供不同的df1值和df2数据帧。

到目前为止,我遵循的

解决方案是,我首先在临时路径上写入newdf数据帧,然后将其读回。但是,这种逻辑对大量数据不利。

下面是一个代码段:

df.show(false)
+------+------+------+------+------+------+------+------+------+-------+
| col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 |
+------+------+------+------+------+------+------+------+------+-------+
| A1   | A2   | A3   | A4   | A5   | A6   | A7   | A8   | A9   |  A10  |
| B1   | B2   | B3   | B4   | B5   | B6   | B7   | B8   | B9   |  B10  |
| C1   | C2   | C3   | C4   | C5   | C6   | C7   | C8   | C9   |  C10  |
+------+------+------+------+------+-------+------+------+------+------+
  
val uuid = udf(() => java.util.UUID.randomUUID().toString)
val newdf = df.withColumn("uuid",uuid())

val df1 = newdf.select(uuid,col1,col2,col3,col4,col5)
val df2 = newdf.select(uuid,col6,col7,col8,col9,col10)

df1.write.format("parquet").save("/df1/")
df2.write.format("parquet").save("/df2/")

df1.show()
+-----------------+------+------+------+------+------+
|     uuid        | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1   | A2   | A3   | A4   | A5   |
|1dbcecf-1304-4a4e| B1   | B2   | B3   | B4   | B5   |
|1vbdecf-8406-4a4e| C1   | C2   | C3   | C4   | C5   |
+-----------------+------+------+------+------+------+

df2.show()
+-----------------+------+------+------+------+------+
|     uuid        | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|2aodecf-3303-6n5e| A6   | A7   | A8   | A9   | A10  |
|2docecf-6305-6n5e| B6   | B7   | B8   | B9   | B10  |
|2vodecf-1406-6n5e| C6   | C7   | C8   | C9   | C10  |
+-----------------+------+------+------+------+------+

预期的输出:在跨行的两个数据框中都使用相同的 uuid

df1.show()
+-----------------+------+------+------+------+------+
|     uuid        | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1   | A2   | A3   | A4   | A5   |
|1dbcecf-1304-4a4e| B1   | B2   | B3   | B4   | B5   |
|1vbdecf-8406-4a4e| C1   | C2   | C3   | C4   | C5   |
+-----------------+------+------+------+------+------+

df2.show()
+-----------------+------+------+------+------+------+
|     uuid        | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A6   | A7   | A8   | A9   | A10  |
|1dbcecf-1304-4a4e| B6   | B7   | B8   | B9   | B10  |
|1vbdecf-8406-4a4e| C6   | C7   | C8   | C9   | C10  |
+-----------------+------+------+------+------+------+

请提供克服此问题的最佳方法。

解决方法

试试这个-

解决方案在Spark Scala API中

使用UUID.nameUUIDFromBytes作为udf

加载提供的测试数据

df.show(false)
    df.printSchema()
    /**
      * +----+----+----+----+----+----+----+----+----+-----+
      * |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|
      * +----+----+----+----+----+----+----+----+----+-----+
      * |A1  |A2  |A3  |A4  |A5  |A6  |A7  |A8  |A9  |A10  |
      * |B1  |B2  |B3  |B4  |B5  |B6  |B7  |B8  |B9  |B10  |
      * |C1  |C2  |C3  |C4  |C5  |C6  |C7  |C8  |C9  |C10  |
      * +----+----+----+----+----+----+----+----+----+-----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: string (nullable = true)
      * |-- col3: string (nullable = true)
      * |-- col4: string (nullable = true)
      * |-- col5: string (nullable = true)
      * |-- col6: string (nullable = true)
      * |-- col7: string (nullable = true)
      * |-- col8: string (nullable = true)
      * |-- col9: string (nullable = true)
      * |-- col10: string (nullable = true)
      */

创建相同的UUID


    val uuid = udf((s: String) => UUID.nameUUIDFromBytes(s.getBytes(StandardCharsets.UTF_8)).toString)
    val newdf = df.withColumn("uuid",uuid(concat_ws(":",df.columns.map(col): _*)))

    val df1 = newdf.select("uuid","col1","col2","col3","col4","col5")
    val df2 = newdf.select("uuid","col6","col7","col8","col9","col10")
    df1.show(false)
    /**
      * +------------------------------------+----+----+----+----+----+
      * |uuid                                |col1|col2|col3|col4|col5|
      * +------------------------------------+----+----+----+----+----+
      * |0c26ece0-708a-3105-896f-e70d18b67766|A1  |A2  |A3  |A4  |A5  |
      * |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B1  |B2  |B3  |B4  |B5  |
      * |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C1  |C2  |C3  |C4  |C5  |
      * +------------------------------------+----+----+----+----+----+
      */

    df2.show(false)
    /**
      * +------------------------------------+----+----+----+----+-----+
      * |uuid                                |col6|col7|col8|col9|col10|
      * +------------------------------------+----+----+----+----+-----+
      * |0c26ece0-708a-3105-896f-e70d18b67766|A6  |A7  |A8  |A9  |A10  |
      * |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B6  |B7  |B8  |B9  |B10  |
      * |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C6  |C7  |C8  |C9  |C10  |
      * +------------------------------------+----+----+----+----+-----+
      */

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...