问题描述
我正在进行以下汇总
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),collect_list(array($"b",$"data2")).alias("final_data2"))
在这里,我正在进行一些汇总,并使用collect_list
收集结果。早些时候,我们使用spark 1,它为我提供了以下数据类型。
|-- final_data1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- final_data2: array (nullable = true)
| |-- element: string (containsNull = true)
现在,我们必须迁移到spark 2,但我们正在使用架构。
|-- final_data1: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- final_data1: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
在获得下面的first()
记录时是不同的
spark 1.6
[2020-09-26,Ayush,103.67] => datatype string
spark 2
WrappedArray(2020-09-26,103.67)
如何保持相同的数据类型?
编辑-使用Concat尝试
我获得像Spark 1.6那样的精确模式的一种方法是使用这样的concat
val df_date_agg = df
.groupBy($"msisdn",$"event_date",$"network")
.agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
.groupBy($"msisdn")
.agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit($"data_mou_dly"),lit("]")))
解决方法
填充final1和final2列将解决此问题。
val data = Seq((1,"A","B"),(1,"C","D"),(2,"E","F"),"G","H"),"I","J"))
val df = spark.createDataFrame(
data
).toDF("col1","col2","col3")
val old_df = df.groupBy(col("col1")).agg(
collect_list(
array(
col("col2"),col("col3")
)
).as("final")
)
val new_df = old_df.select(col("col1"),flatten(col("final")).as("final_new"))
println("Input Dataframe")
df.show(false)
println("Old schema format")
old_df.show(false)
old_df.printSchema()
println("New schema format")
new_df.show(false)
new_df.printSchema()
输出:
Input Dataframe
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1 |A |B |
|1 |C |D |
|2 |E |F |
|2 |G |H |
|2 |I |J |
+----+----+----+
Old schema format
+----+------------------------+
|col1|final |
+----+------------------------+
|1 |[[A,B],[C,D]] |
|2 |[[E,F],[G,H],[I,J]]|
+----+------------------------+
root
|-- col1: integer (nullable = false)
|-- final: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
New schema format
+----+------------------+
|col1|final_new |
+----+------------------+
|1 |[A,B,C,D] |
|2 |[E,F,G,H,I,J]|
+----+------------------+
root
|-- col1: integer (nullable = false)
|-- final_new: array (nullable = true)
| |-- element: string (containsNull = true)
在特定情况下
val df_date_agg = df
.groupBy($"a",$"b",$"c")
.agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
.groupBy($"a")
.agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),collect_list(array($"b",$"data2")).alias("final_data2"))
.select(flatten(col("final_data1").as("final_data1"),flatten(col("final_data2).as("final_data2))
,
既然您想要数组的字符串表示形式,如何将数组转换成这样的字符串?
val df_date_agg = df
.groupBy($"a",$"data1") cast "string").alias("final_data1"),$"data2") cast "string").alias("final_data2"))
这可能只是您旧版本的spark在做什么。
您提出的解决方案可能也可以很好地工作,但是没有必要用lit
包装列引用(lit($"event_date")
)。 $"event_date"
就足够了。