我在使用制表符分隔值(TSV)和逗号分隔值(CSV)文件时使用Spark 2.0.我想将数据加载到Spark-sql数据帧中,我想在读取文件时完全控制模式.我不希望Spark从文件中的数据中猜出架构.
解决方法
下面是一个完整的Spark 2.0示例,用于加载制表符分隔值(TSV)文件和应用模式.
我以Iris data set in TSV format from UAH.edu为例.以下是该文件的前几行:
Type PW PL SW SL 0 2 14 33 50 1 24 56 31 67 1 23 51 31 69 0 2 10 36 46 1 20 52 30 65
要强制执行模式,可以使用以下两种方法之一以编程方式构建它:
A.使用StructType创建架构:
import org.apache.spark.sql.types._ var irisSchema = StructType(Array( StructField("Type",IntegerType,true),StructField("PetalWidth",StructField("PetalLength",StructField("SepalWidth",StructField("SepalLength",true) ))
B.或者,使用case类和Encoders创建模式(这种方法不那么冗长):
import org.apache.spark.sql.Encoders case class IrisSchema(Type: Int,PetalWidth: Int,PetalLength: Int,SepalWidth: Int,SepalLength: Int) var irisSchema = Encoders.product[IrisSchema].schema
创建模式后,可以使用spark.read读入TSV文件.请注意,只要正确设置选项(“分隔符”,d)选项,您实际上也可以读取逗号分隔值(CSV)文件或任何分隔文件.此外,如果您有一个具有标题行的数据文件,请务必设置选项(“标题”,“真”).
以下是完整的最终代码:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Encoders val spark = SparkSession.builder().getorCreate() case class IrisSchema(Type: Int,SepalLength: Int) var irisSchema = Encoders.product[IrisSchema].schema var irisDf = spark.read.format("csv"). // Use "csv" regardless of TSV or CSV. option("header","true"). // Does the file have a header line? option("delimiter","\t"). // Set delimiter to tab or comma. schema(irisSchema). // Schema that was built above. load("iris.tsv") irisDf.show(5)
这是输出:
scala> irisDf.show(5) +----+----------+-----------+----------+-----------+ |Type|PetalWidth|PetalLength|SepalWidth|SepalLength| +----+----------+-----------+----------+-----------+ | 0| 2| 14| 33| 50| | 1| 24| 56| 31| 67| | 1| 23| 51| 31| 69| | 0| 2| 10| 36| 46| | 1| 20| 52| 30| 65| +----+----------+-----------+----------+-----------+ only showing top 5 rows