问题描述
我对ETL领域很陌生,我希望使用Cassandra 3.7和Spark实现增量数据加载。我知道Cassandra的更高版本确实支持CDC,但我只能使用Cassandra 3.7。有没有一种方法,我可以通过该方法仅跟踪更改的记录并使用spark加载它们,从而执行增量数据加载?
如果无法在cassandra端完成,则在Spark方面也欢迎其他建议:)
解决方法
这是一个广泛的主题,有效的解决方案将取决于表中的数据量,表结构,数据的插入/更新方式等。此外,特定的解决方案可能取决于可用的Spark版本。纯火花方法的一个缺点是,如果没有完整的先前状态副本,就无法轻松检测数据的删除,因此您可以在两种状态之间产生差异。
在所有情况下,都需要执行全表扫描以查找更改的条目,但是如果表是专门为此任务组织的,则可以避免读取所有数据。例如,如果您的表格结构如下:
create table test.tbl (
pk int,ts timestamp,v1 ...,v2 ...,primary key(pk,ts));
然后执行以下查询:
import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl","test").load()
val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp)
AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")
然后,Spark Cassandra Connector将把该查询下推到Cassandra,并且仅读取ts
在给定时间范围内的数据-您可以通过执行filtered.explain
并同时检查两个时间来进行检查过滤器标有*
符号。
另一种检测更改的方法是从Cassandra中检索写入时间,然后根据该信息过滤掉更改。对于所有最新版本的SCC,写入时间的获取为supported in RDD API,而自supported in the Dataframe API起,获取的写入时间为release of SCC 2.5.0(至少需要Spark 2.4,尽管可能也适用于2.3)。提取此信息后,您可以对数据应用过滤器并提取更改。但是您需要记住几件事:
- 无法使用此方法检测删除
- 写入时间信息仅适用于常规和静态列,而不适用于主键列
- 如果在插入后该行进行了部分更新,则每列可能都有其自己的写入时间值
- 在大多数版本的Cassandra中,对集合列(列表/地图/集)完成的
writetime
函数调用将产生错误,并且将/可能返回null
对于用户定义的列输入
P.S。即使您启用了CDC,正确使用它也不是一件容易的事:
- 您需要删除重复项-您拥有更改的RF副本
- 例如,当节点关闭时,某些更改可能会丢失,然后通过提示或修复进行传播
- TTL不容易处理
- ...
对于CDC,您可以在2019年DataStax Accelerate会议上寻找演示文稿-有关该主题的一些演讲。