Spark collect_list将data_type从数组更改为字符串

问题描述

正在进行以下汇总

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),collect_list(array($"b",$"data2")).alias("final_data2"))

在这里,我正在进行一些汇总,并使用collect_list收集结果。早些时候,我们使用spark 1,它为我提供了以下数据类型。

 |-- final_data1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- final_data2: array (nullable = true)
 |    |-- element: string (containsNull = true)

现在,我们必须迁移到spark 2,但我们正在使用架构。

|-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

在获得下面的first()记录时是不同的

spark 1.6

[2020-09-26,Ayush,103.67] => datatype string

spark 2 

WrappedArray(2020-09-26,103.67)

如何保持相同的数据类型?

编辑-使用Concat尝试

我获得像Spark 1.6那样的精确模式的一种方法是使用这样的concat

val df_date_agg = df
    .groupBy($"msisdn",$"event_date",$"network")
    .agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
    .groupBy($"msisdn")
    .agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit($"data_mou_dly"),lit("]")))

这会影响我的代码性能吗?有更好的方法吗?

解决方法

填充final1和final2列将解决此问题。

val data = Seq((1,"A","B"),(1,"C","D"),(2,"E","F"),"G","H"),"I","J"))

val df = spark.createDataFrame(
  data
).toDF("col1","col2","col3")

val old_df = df.groupBy(col("col1")).agg(
    collect_list(
        array(
            col("col2"),col("col3")
            )
    ).as("final")
    )
val new_df = old_df.select(col("col1"),flatten(col("final")).as("final_new"))
println("Input Dataframe")

df.show(false)
println("Old schema format")
old_df.show(false)
old_df.printSchema()

println("New schema format")
new_df.show(false)
new_df.printSchema()

输出:

Input Dataframe
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |A   |B   |
|1   |C   |D   |
|2   |E   |F   |
|2   |G   |H   |
|2   |I   |J   |
+----+----+----+

Old schema format
+----+------------------------+
|col1|final                   |
+----+------------------------+
|1   |[[A,B],[C,D]]        |
|2   |[[E,F],[G,H],[I,J]]|
+----+------------------------+

root
 |-- col1: integer (nullable = false)
 |-- final: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

New schema format
+----+------------------+
|col1|final_new         |
+----+------------------+
|1   |[A,B,C,D]      |
|2   |[E,F,G,H,I,J]|
+----+------------------+

root
 |-- col1: integer (nullable = false)
 |-- final_new: array (nullable = true)
 |    |-- element: string (containsNull = true)

在特定情况下

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),collect_list(array($"b",$"data2")).alias("final_data2"))
         .select(flatten(col("final_data1").as("final_data1"),flatten(col("final_data2).as("final_data2))
,

既然您想要数组的字符串表示形式,如何将数组转换成这样的字符串?

val df_date_agg = df
    .groupBy($"a",$"data1") cast "string").alias("final_data1"),$"data2") cast "string").alias("final_data2"))

这可能只是您旧版本的spark在做什么。

您提出的解决方案可能也可以很好地工作,但是没有必要用lit包装列引用(lit($"event_date"))。 $"event_date"就足够了。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...