问题描述
我需要有关以下用例的帮助:
问题 1:我的 RDD 格式如下。现在从这个 RDD 中,我想排除 airport.code in("PUN","HAR","KAS")
case class airport(code:String,city:String,airportname:String)
val airportRdd=sparkSession.sparkContext.textFile("src/main/resources/airport_data.csv").
map(x=>x.split(","))
val airPortRddTransformed=airportRdd.map(x=>airport(x(0),x(1),x(2)))
val trasnformedRdd=airPortRddTransformed.filter(air=>!(air.code.contains(seqValues:_*)))
但是!不起作用。它告诉我无法解析符号!。有人可以帮助我。如何在 RDD 中进行否定。我只能使用 RDD 方法。
还有一个问题:
val seqColumns=List("lat","longi","height","country")
我想在加载 RDD 时排除这些列。我该怎么做。我的生产 RDD 有 70 列,我只知道要排除的列名。不是每列的索引。再次在RDD 方法。我知道如何在 Dataframe 方法中做到这一点。
解决方法
问题 1
使用 broadcast
将值列表传递给 filter
函数。过滤器中的 _* 似乎不起作用。我将条件更改为 !seqValues.value.contains(air.code)
数据样本:airport_data.csv
C001,Pune,Pune Airport
C002,Mumbai,Chhatrapati Shivaji Maharaj International Airport
C003,New York,New York Airport
C004,Delhi,Delhi Airport
代码片段
case class airport(code:String,city:String,airportname:String)
val seqValues=spark.sparkContext.broadcast(List("C001","C003"))
val airportRdd = spark.sparkContext.textFile("D:\\DataAnalysis\\airport_data.csv").map(x=>x.split(","))
val airPortRddTransformed = airportRdd.map(x=>airport(x(0),x(1),x(2)))
//airPortRddTransformed.foreach(println)
val trasnformedRdd = airPortRddTransformed.filter(air => !seqValues.value.contains(air.code))
trasnformedRdd.foreach(println)
输出 ->
airport(C002,Chhatrapati Shivaji Maharaj International Airport)
airport(C004,Delhi Airport)
,
我会改变的事情:
1- 您正在将 .csv
作为 TextFile 读取,然后根据 ,
拆分行。您可以通过阅读文件来保存这一步:
val df = spark.read.csv("src/main/resources/airport_data.csv")
2- 更改 contains
val trasnformedRdd = airPortRddTransformed.filter(air => !(seqValues.contains(air.code)))