我在更新键空间中的表时遇到了
scala上的spark cassandra连接器问题
这是我的一段代码
val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE + " SET a= a + " + b + " WHERE x=" + x + " AND y=" + y + " AND z=" + x println(query) val KeySpace = new CassandrasqlContext(sparkContext) KeySpace.setKeyspace(KEYSPACE) hourUniqueKeySpace.sql(query)
Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found
知道为什么会这样吗?
我怎样才能解决这个问题?
解决方法
通过spark-cassandra-connector可以更新具有
counter column的表.您必须使用DataFrames和
DataFrameWriter方法保存模式“append”(或
SaveMode.Append,如果您愿意).检查代码
DataFrameWriter.scala.
例如,给出一个表:
cqlsh:test> SELECT * FROM name_counter ; name | surname | count ---------+---------+------- John | Smith | 100 Zhang | Wei | 1000 Angelos | Papas | 10
代码应该如下所示:
val updateRdd = sc.parallelize(Seq(Row("John","Smith",1L),Row("Zhang","Wei",2L),Row("Angelos","Papas",3L))) val tblStruct = new StructType( Array(StructField("name",StringType,nullable = false),StructField("surname",StructField("count",LongType,nullable = false))) val updateDf = sqlContext.createDataFrame(updateRdd,tblStruct) updateDf.write.format("org.apache.spark.sql.cassandra") .options(Map("keyspace" -> "test","table" -> "name_counter")) .mode("append") .save()
更新后:
name | surname | count ---------+---------+------- John | Smith | 101 Zhang | Wei | 1002 Angelos | Papas | 13
通过隐式转换RDD to a DataFrame:import sqlContext.implicits._并使用.toDF(),DataFrame转换可以更简单.
检查此玩具应用程序的完整代码:
https://github.com/kyrsideris/SparkUpdateCassandra/tree/master
由于版本在这里非常重要,以上内容适用于Scala 2.11.7,Spark 1.5.1,spark-cassandra-connector 1.5.0-RC1-s_2.11,Cassandra 3.0.5.自@since 1.4.0以来,DataFrameWriter被指定为@Experimental.