问题描述
这是我的表 1:
case class Table1(
key1: String // (primary key),value1: String
)
表 2:
case class Table2(
key1: String // (partition key)
key2: String // (primary key)
value1: String
)
我需要将表 1 中的 value1 复制到表 2 中分区的所有行(匹配表 1 中的 key1)。
我的代码:
val table2 = table1.map(
s => {
.... // Here code
}
)
table2.savetoCassandra("keyspaceName","table2",SomeColumns("
解决方法
在表之间进行连接,然后将数据写回,例如:
import spark.implicits._
import org.apache.spark.sql.cassandra._
val df1 = spark.read.cassandraFormat("table1","ks").load
val df2 = spark.read.cassandraFormat("table2","ks").load.select("key1","key2")
val joined = df2.join(df1,df1("key1") === df2("key2"))
.select(df2("key1"),df2("key2"),df1("value1"))
joined.write.cassandraFormat("table2","ks").save
为了使此过程高效,您需要使用自 Spark Cassandra Connector 2.5.0 以来可用的所谓直接连接(如果您有以前的版本,那么您将需要转到 RDD API 并使用函数 {{1 }}).
直接加入在进程开始时启用(.joinWithCassandraTable
或 spark-shell
),如下所示:
spark-submit
附言您可以在 following blog post 中阅读有关加入 Cassandra 的更多信息。