问题描述
我是Scala和Spark的新手。
我正在尝试使用编码器从Spark读取文件,然后转换为java / scala对象。
应用模式和使用as进行编码来读取文件的第一步很好。
然后,我使用该数据集/数据框执行简单的地图操作,但是如果尝试在生成的数据集/数据框上打印架构,则不会打印任何列。
此外,当我第一次读取文件时,我没有映射Person类中的age字段,只是为了在map函数中进行计算以进行尝试-但我看不到使用以下方法未将age映射到数据框所有人。
Person.txt中的数据:
firstName,lastName,dob
ABC,XYZ,01/01/2019
CDE,FGH,01/02/2020
下面是代码:
object EncoderExample extends App {
val sparkSession = SparkSession.builder().appName("EncoderExample").master("local").getorCreate();
case class Person(firstName: String,lastName: String,dob: String,var age: Int = 10)
implicit val encoder = Encoders.bean[Person](classOf[Person])
val personDf = sparkSession.read.option("header","true").option("inferSchema","true").csv("Person.txt").as(encoder)
personDf.printSchema()
personDf.show()
val calAge = personDf.map(p => {
p.age = Year.Now().getValue - p.dob.substring(6).toInt
println(p.age)
p
} )//.toDF()//.as(encoder)
print("*********Person DF Schema after age calculation: ")
calAge.printSchema()
//calAge.show
}
解决方法
package spark
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.functions._
case class Person(firstName: String,lastName: String,dob: String,age: Long)
object CalcAge extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val sourceDF = Seq(
("ABC","XYZ","01/01/2019"),("CDE","FGH","01/02/2020")
).toDF("firstName","lastName","dob")
sourceDF.printSchema
// root
// |-- firstName: string (nullable = true)
// |-- lastName: string (nullable = true)
// |-- dob: string (nullable = true)
sourceDF.show(false)
// +---------+--------+----------+
// |firstName|lastName|dob |
// +---------+--------+----------+
// |ABC |XYZ |01/01/2019|
// |CDE |FGH |01/02/2020|
// +---------+--------+----------+
def getCurrentYear: Long = {
val today:java.util.Date = Calendar.getInstance.getTime
val timeFormat = new SimpleDateFormat("yyyy")
timeFormat.format(today).toLong
}
val ageUDF = udf((d1: String) => {
val year = d1.split("/").reverse.head.toLong
val yearNow = getCurrentYear
yearNow - year
})
val df = sourceDF
.withColumn("age",ageUDF('dob))
df.printSchema
// root
// |-- firstName: string (nullable = true)
// |-- lastName: string (nullable = true)
// |-- dob: string (nullable = true)
// |-- age: long (nullable = false)
df.show(false)
// +---------+--------+----------+---+
// |firstName|lastName|dob |age|
// +---------+--------+----------+---+
// |ABC |XYZ |01/01/2019|1 |
// |CDE |FGH |01/02/2020|0 |
// +---------+--------+----------+---+
val person = df.as[Person].collectAsList()
// person: java.util.List[Person] = [Person(ABC,XYZ,01/01/2019,1),Person(CDE,FGH,01/02/2020,0)]
println(person)
}