Cassandra 3.7 CDC /增量数据加载

问题描述

我对ETL领域很陌生,我希望使用Cassandra 3.7和Spark实现增量数据加载。我知道Cassandra的更高版本确实支持CDC,但我只能使用Cassandra 3.7。有没有一种方法,我可以通过该方法仅跟踪更改的记录并使用spark加载它们,从而执行增量数据加载?

如果无法在cassandra端完成,则在Spark方面也欢迎其他建议:)

解决方法

这是一个广泛的主题,有效的解决方案将取决于表中的数据量,表结构,数据的插入/更新方式等。此外,特定的解决方案可能取决于可用的Spark版本。纯火花方法的一个缺点是,如果没有完整的先前状态副本,就无法轻松检测数据的删除,因此您可以在两种状态之间产生差异。

在所有情况下,都需要执行全表扫描以查找更改的条目,但是如果表是专门为此任务组织的,则可以避免读取所有数据。例如,如果您的表格结构如下:

create table test.tbl (
  pk int,ts timestamp,v1 ...,v2 ...,primary key(pk,ts));

然后执行以下查询:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl","test").load()
val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) 
                              AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")

然后,Spark Cassandra Connector将把该查询下推到Cassandra,并且仅读取ts在给定时间范围内的数据-您可以通过执行filtered.explain并同时检查两个时间来进行检查过滤器标有*符号。

另一种检测更改的方法是从Cassandra中检索写入时间,然后根据该信息过滤掉更改。对于所有最新版本的SCC,写入时间的获取为supported in RDD API,而自supported in the Dataframe API起,获取的写入时间为release of SCC 2.5.0(至少需要Spark 2.4,尽管可能也适用于2.3)。提取此信息后,您可以对数据应用过滤器并提取更改。但是您需要记住几件事:

  • 无法使用此方法检测删除
  • 写入时间信息仅适用于常规和静态列,而不适用于主键列
  • 如果在插入后该行进行了部分更新,则每列可能都有其自己的写入时间值
  • 在大多数版本的Cassandra中,对集合列(列表/地图/集)完成的writetime函数调用将产生错误,并且将/可能返回null对于用户定义的列输入

P.S。即使您启用了CDC,正确使用它也不是一件容易的事:

  • 您需要删除重复项-您拥有更改的RF副本
  • 例如,当节点关闭时,某些更改可能会丢失,然后通过提示或修复进行传播
  • TTL不容易处理
  • ...

对于CDC,您可以在2019年DataStax Accelerate会议上寻找演示文稿-有关该主题的一些演讲。