在 Scala 中,通过 List[String] 过滤 Spark Cassandra RDD 的正确方法是什么?

问题描述

我有一个字符串格式的 id 列表,这个列表大约有 20,000 个 id:

var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;

当我对我的一个 cassandra 表使用这个列表时,它工作得很好,无论时间线 IdsString 的大小如何:

var timelineHistorySource = sc.cassandraTable[Timeline]("acd","timeline_history_bytimelineid")
        .select("ownerid","userid","timelineid","timelinetype","starttime","endtime","attributes","states")
if (constrain)
    timelineHistorySource = timelineHistorySource.where("timelineid IN ?",timelineIdsString)

当我对另一个表执行此操作时,如果列表中的 ID 超过 1000,它永远不会完成:

var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
            .select("ownerid","dispositionid","month","createddate","createduserid")
if(constrain)
    dispositionSource = dispositionSource.where("timelineid IN ?",timelineIdsString);

两个 cassandra 表都将键作为时间轴 ID,所以我知道它是正确的。只要timelineids 是一个小列表,此代码就可以正常工作。

有没有更好的方法来过滤 cassandra RDD?是 IN 子句的大小导致它窒息吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)