问题描述
我正在用Spark试用Kudu。我想使用以下架构联接2个表-
# This table has around 1 million records
TABLE dimensions (
id INT32 NOT NULL,PRIMARY KEY (id)
)
HASH (id) PARTITIONS 32,RANGE (id) (
PARTITION UNBOUNDED
)
OWNER root
REPLICAS 1
# This table has 500 million records
TABLE facts (
id INT32 NOT NULL,date DATE NOT NULL,PRIMARY KEY (id,date)
)
HASH (id) PARTITIONS 32,RANGE (id,date) (
PARTITION UNBOUNDED
)
OWNER root
REPLICAS 1
我使用以下脚本将数据插入了这些表-
// Load data to spark dataframe
val dimensions_raw = spark.sqlContext.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("/root/dimensions.csv")
dimensions_raw.printSchema
dimensions_raw.createOrReplaceTempView("dimensions_raw")
// Set the primary key columns
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
def setNotNull(df: DataFrame,columns: Seq[String]) : DataFrame = {
val schema = df.schema
// Modify [[StructField] for the specified columns.
val newSchema = StructType(schema.map {
case StructField(c,t,_,m) if columns.contains(c) => StructField(c,nullable = false,m)
case y: StructField => y
})
// Apply new schema to the DataFrame
df.sqlContext.createDataFrame(df.rdd,newSchema)
}
val primaryKeyCols = Seq("id") // `primaryKeyCols` for `facts` table is `(id,date)`
val dimensions_prep = setNotNull(dimensions_raw,primaryKeyCols)
dimensions_prep.printSchema
dimensions_prep.createOrReplaceTempView("dimensions_prep")
// Create a kudu table
import collection.JavaConverters._
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("localhost:7051",spark.sparkContext)
// Delete the table if it already exists.
if(kuduContext.tableExists("dimensions")) {
kuduContext.deleteTable("dimensions")
}
kuduContext.createTable("dimensions",dimensions_prep.schema,/* primary key */ primaryKeyCols,new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("id").asJava,32))
// Load the kudu table from spark dataframe
kuduContext.insertRows(dimensions_prep,"dimensions")
// Create a DataFrame that points to the Kudu table we want to query.
val dimensions = spark.read
.option("kudu.master","localhost:7051")
.option("kudu.table","dimensions")
.format("kudu").load
dimensions.createOrReplaceTempView("dimensions")
也为facts
表运行以上脚本。
我想将facts
上的dimensions
与id
表联接在一起。我在Spark-
val query = facts.join(dimensions,facts.col("id") === dimensions.col("id"))
query.show()
// And I get the following Physical plan-
== Physical Plan ==
*(5) SortMergeJoin [id#0],[id#14],Inner
:- *(2) Sort [id#0 ASC NULLS FirsT],false,0
: +- Exchange hashpartitioning(id#0,200),true,[id=#43]
: +- *(1) Scan Kudu facts [id#0,date#1] PushedFilters: [],ReadSchema: struct<id:int,date:date...
+- *(4) Sort [id#14 ASC NULLS FirsT],0
+- Exchange hashpartitioning(id#14,[id=#49]
+- *(3) Scan Kudu dimensions [id#14] PushedFilters: [],ReadSchema: struct<id:int>
我的问题是,我如何告诉火花表已经在id
(联接键)上排序,因此无需再次排序。
此外,Exchange hashpartitioning
不需要完成,因为该表已经存储在id
上。
在运行单个主机和平板电脑服务器的一台计算机上,联接查询花费不到100秒的时间。 我在这里做错什么了吗?还是Kudu这种查询的预期速度?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)