Spark:复制每一行,但更改一列的值

问题描述

如何在spark中执行以下操作@H_404_1@

Initially:
+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
+-----------+-----+------+

Final result:
+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-15 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-16 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-17 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-18 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
|2020-08-19 | 6   | mno  |
+-----------+-----+------+

因此从本质上讲,需要在每一列中更改一个列值的更改,即,对于每一行,请使用日期列减去当前值的1天作为重复。@H_404_1@

解决方法

尝试使用 date_add 函数,然后创建具有date列和date-1 列的数组,最后爆炸该列。

Example:

df.show()

/*
+----------+----+----+
|      date|col1|col2|
+----------+----+----+
|2020-08-16|   2| abc|
|2020-08-17|   3| def|
+----------+----+----+
*/

import org.apache.spark.sql.functions._

df.withColumn("new_date",array(col("date"),date_add(col("date"),-1))).
drop("date").
selectExpr("explode(new_date) as date","*").
drop("new_date").
show(10,false)

/*
+----------+----+----+
|date      |col1|col2|
+----------+----+----+
|2020-08-16|2   |abc |
|2020-08-15|2   |abc |
|2020-08-17|3   |def |
|2020-08-16|3   |def |
+----------+----+----+
*/
,

我本来以为union对于这种解决方案会很优雅,例如

// Union the two dataframes together,take 1 day away from the date
df.union(df.select(date_add($"date",-1),$"col1",$"col2"))

我在其中创建测试数据的完整示例脚本:

import org.apache.spark.sql.functions._

val dfOriginal = Seq(("2020-08-16",2,"abc"),("2020-08-17",3,"def"),("2020-08-18",4,"ghi"),("2020-08-19",5,"jkl"),("2020-08-20",6,"mno"))
  .toDF("date","col1","col2")

val df = dfOriginal
  .select (to_date($"date","yyyy-MM-dd").as("date"),$"col2")

// Union the two dataframes together,$"col2"))
  .orderBy("date","col2")
  .show

我的结果:

My results

,

这可能有点晚了,但在 python 上回答这个问题,所以其他人可能会觉得它有用。

from pyspark.sql.functions import *

初始 DF 如下所示:

+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
+-----------+-----+------+

df.withColumn("dates_array",-1))))
.drop("date")
.withColumn("date",explode("dates_array"))
.drop("dates_array")
.show()

然后你就会得到你想要的:

+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-15 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-16 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-17 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-18 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
|2020-08-19 | 6   | mno  |
+-----------+-----+------+