scala – 使用spark cassandra连接器更新Cassandra表

我在更新键空间中的表时遇到了 scala上的spark cassandra连接器问题

这是我的一段代码

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandrasqlContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)

当我执行此代码时,我收到这样的错误

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found

知道为什么会这样吗?
我怎样才能解决这个问题?

解决方法

通过spark-cassandra-connector可以更新具有 counter column的表.您必须使用DataFrames和 DataFrameWriter方法保存模式“append”(或 SaveMode.Append,如果您愿意).检查代码 DataFrameWriter.scala.

例如,给出一个表:

cqlsh:test> SELECT * FROM name_counter ;

 name    | surname | count
---------+---------+-------
    John |   Smith |   100
   Zhang |     Wei |  1000
 Angelos |   Papas |    10

代码应该如下所示:

val updateRdd = sc.parallelize(Seq(Row("John","Smith",1L),Row("Zhang","Wei",2L),Row("Angelos","Papas",3L)))

val tblStruct = new StructType(
    Array(StructField("name",StringType,nullable = false),StructField("surname",StructField("count",LongType,nullable = false)))

val updateDf  = sqlContext.createDataFrame(updateRdd,tblStruct)

updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test","table" -> "name_counter"))
.mode("append")
.save()

更新后:

name    | surname | count
---------+---------+-------
    John |   Smith |   101
   Zhang |     Wei |  1002
 Angelos |   Papas |    13

通过隐式转换RDD to a DataFrame:import sqlContext.implicits._并使用.toDF(),DataFrame转换可以更简单.

检查此玩具应用程序的完整代码
https://github.com/kyrsideris/SparkUpdateCassandra/tree/master

由于版本在这里非常重要,以上内容适用于Scala 2.11.7,Spark 1.5.1,spark-cassandra-connector 1.5.0-RC1-s_2.11,Cassandra 3.0.5.自@since 1.4.0以来,DataFrameWriter被指定为@Experimental.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...