Spark Rdd

问题描述

我必须在spark中过滤Cassandra表,在通过spark从表中获取数据之后,在返回的rdd上应用filter函数,我们不想在cassandra api中使用where子句可以过滤,但是需要在filter列,由于在cassandra中进行多个ss表扫描,因此存在磁盘开销问题。 例如:

val ct = sc.cassandratable("keyspace1","table1")
val fltr = ct.filter(x=x.contains "zz")

table1字段为:

  • 分散的uuid
  • 文件名文本
  • 事件诠释
  • eventtimestamp bigint
  • fileid int
  • 文件类型为int

基本上,我们需要基于具有任意字符串的文件名过滤数据。由于返回的rdd是类型 com.datastax.spark.connector.rdd.CassandratableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandratableScanRDD 和筛选器操作仅限于enter image description here

类型的CassandraRow方法
    val ct = sc.cassandratable("keyspace1","table1")
    scala> ct
    res140: com.datastax.spark.connector.rdd.CassandratableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandratableScanRDD[171] at RDD at CassandraRDD.scala:19

当我在“ x”之后点击制表符时。在下面的过滤器函数中,该函数在此处显示了CassandraRow类的以下输入方法

scala> ct.filter(x=>x.
columnValues   getBooleanoption   getDateTime         getFloatOption   getLongOption    getString             getUUIDOption     length
contains       getByte            getDateTimeOption   getInet          getMap           getStringOption       getvarInt         MetaData
copy           getByteOption      getDecimal          getInetoption    getRaw           getTupleValue         getvarIntOption   nameOf
dataAsstring   getBytes           getDecimalOption    getInt           getRawCql        getTupleValueOption   hashCode          size
equals         getBytesOption     getDouble           getIntOption     getSet           getUDTValue           indexOf           toMap
get            getDate            getDoubleOption     getList          getShort         getUDTValueOption     isNullAt          toString
getBoolean     getDateOption      getFloat            getLong          getShortOption   getUUID               iterator

解决方法

您需要从CassandraRow对象获取字符串字段,然后对其进行过滤。因此,这段代码如下所示:

val fltr = ct.filter(x => x.getString("filename").contains("zz"))