如何处理分隔符是否出现在Spark rdd的数据中

问题描述

使用spark RDD加载文件时,如何处理数据中是否存在分隔符。

我的数据如下:

NAME|AGE|DEP
Suresh|32|BSC
"Sathish|Kannan"|30|BE

如何将此列转换为如下所示的3列。

NAME     AGE     DEP
suresh     32      Bsc
Sathish|Kannan      30     BE

请参阅我如何尝试加载数据。

scala> val rdd = sc.textFile("file:///test/Sample_dep_20.txt",2)
rdd: org.apache.spark.rdd.RDD[String] = hdfs://Hive/Sample_dep_20.txt MapPartitionsRDD[1] at textFile at <console>:27


rdd.collect.foreach(println)

101|"Sathish|Kannan"|BSC
102|Suresh|DEP


scala> val rdd2=rdd.map(x=>x.split("\""))
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:29

scala> val rdd3=rdd2.map(x=>
     | {
     | var strarr = scala.collection.mutable.ArrayBuffer[String]()
     | for(v<-x)
     | {
     | if(v.startsWith("\"") && v.endsWith("\""))
     |   strarr +=v.replace("\"","")
     | else if(v.contains(","))
     |   strarr ++=v.split(",")
     | else
     |   strarr +=v
     | }
     | strarr
     | }
     | )
rdd3: org.apache.spark.rdd.RDD[scala.collection.mutable.ArrayBuffer[String]] = MapPartitionsRDD[3] at map at <console>:31

scala> rdd3.collect.foreach(println)
ArrayBuffer(101|,Sathish|Kannan,|BSC)
ArrayBuffer(102|Suresh|DEP)

解决方法

也许您需要将"明确定义为引号字符(csv阅读器默认情况下是引号字符,但在您的情况下可能不会吗?)。因此,在读取.csv文件时,在选项中添加.option("quote","\"")应该可以。

scala> val inputds = Seq("Suresh|32|BSC","\"Satish|Kannan\"|30|BE").toDS()
inputds: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val outputdf = spark.read.option("header",false).option("delimiter","|").option("quote","\"").csv(inputds)
outputdf: org.apache.spark.sql.DataFrame = [_c0: string,_c1: string ... 1 more field]

scala> outputdf.show(false)
+-------------+---+---+
|_c0          |_c1|_c2|
+-------------+---+---+
|Suresh       |32 |BSC|
|Satish|Kannan|30 |BE |
+-------------+---+---+

定义使DataFrameReader忽略带引号的字符串中的分隔符,请参见Spark API文档here

编辑

如果您想努力工作并且仍然使用普通的RDD,请尝试像这样修改split()函数:

val rdd2=rdd.map(x=>x.split("\\|(?=([^\"]*\"[^\"]*\")*[^\"]*$)"))

它使用正向预见来忽略引号内的|分隔符,并避免您在第二个.map中进行字符串操作。