问题描述
我有一个实时流传输解决方案,其中有 Kafka , Spark (作为聚合引擎)和 Cassandra (作为商店)。用户定义所需的聚合,然后引擎创建该聚合并将其写入商店。这是一个如何创建聚合的示例
CREATE AGGR COUNT FROM input_data WHERE type,event,id
这将为3列创建一个 count 汇总并写入C *。
我们也需要处理历史数据。这意味着,如果今天创建了一个汇总,我们需要返回并修复其历史记录。为了满足这种用例,我们在Cassandra中创建了一个 hvalue 列。这是供参考的模式
CREATE TABLE tbl (
key blob,key2 blob,key3 blob,...
key15 blob,column1 blob,column2 blob,...
column20 blob,*hvalue* blob,*value* blob,PRIMARY KEY ((key,key2,key3 ... key15),column1 ... column20)
) WITH CLUSTERING ORDER BY (column1 ASC,column2 ASC .. column20 ASC)
值存储在线处理时计算出的事实。 hvalue 存储用于历史处理的值。查询时,将检索,合并这两个列并将其返回给用户。
我们正在使用datastax leftJoin API与Cassandra联接。
RDD.leftJoinWithCassandraTable(keyspace,tableName)
.on(SomeColumns(...)
.map { case (ip,row) => row match {
case None => ip
case Some(data) => CASSANDRA_MAP_SCHEMA(...)
)
}
}.saveToCassandra(keyspace,tableName)
简而言之,我们为RDD创建一个架构,并将该行写入Cassandra。
现在,这是问题所在。在历史过程中,我们需要创建一行以写入Cassandra。这意味着我们需要向“值”列提供一些数据。如果它是Cassandra中不存在的新行,我们将创建一个空对象并写回。如果该行存在,我们将现有值取回。 在线和历史过程将同时运行。这意味着当历史进程读取一行并回写时,在线进程可能已经创建了同一行。这将导致数据损坏,因为历史进程可能会读取过时的数据并更新在线进程写入的值。 我不确定如何解决此问题。如果有其他解决方案可以防止这种情况,我将不胜感激。 我尽力解释了一切,让我知道是否需要进一步说明,我将尝试添加更多输入。
预先感谢您的帮助。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)