问题描述
我有 a text file,其中包含有关发件人和消息的信息。格式为发件人,消息。
我已将文件加载到 RDD 中并用“,”分割它们并创建了一个键值对,其中键是发送者,值是消息 RDD[(String,String)]
。
然后,我做了一个 groupByKey()
根据发件人对消息进行分组,我得到了一个 RDD[(String,Iterable[String])]
。
Array[(String,Iterable[String])] = Array((Key,CompactBuffer(value1,value2,value3,....))
现在,我想迭代值部分,并将值一个一个地存储到列表中,所以我创建了一个空 Map,其中键是字符串,值是 List[String]
首先我应该检查 Map 是否为空,如果它为空,那么我应该将第一个值添加到 Map 中存在的 List 中。
以下是我尝试过但我做不到的,当我检查地图时,它显示无。
import org.apache.spark.{SparkConf,SparkContext}
import scala.collection.mutable.ListBuffer
object Demo{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val inputFile = "D:\\MyData.txt"
val data = sc.textFile(inputFile)
val data2 = data.map(line => {val arr = line.split(",");
(arr(0),arr(1))})
val grpData = data2.groupByKey()
val myMap = scala.collection.mutable.Map.empty[String,List[String]]
for(value <- grpData.values){
val list = ListBuffer[String]()
if(myMap.isEmpty){
list += value
myMap.put("G1",list.toList)
}
}
}
在for循环中,我给出了grpData.values,因为我只需要值部分。作为发件人,我不想要我文件中的任何密钥。我只是用它们根据发件人对消息进行分组,但在 Map[String,List[String]] 中,我的键应该是 Group1、Group2 等等。值是我将从 CompactBuffer 中一一获取的消息。
首先,我应该检查 Map 是否为空,如果为空,我应该将第一条消息添加到 Map 中存在的 List 中。键应为“Group1”,值应为应存储在 List[String] 中的消息。
对于第二次迭代,Map 不会为空然后条件会转到 else 部分,在 else 部分我应该使用 lavenshtein 距离算法来比较消息。这里第一条消息已经添加到列表中,现在我应该从 Map 获取第一条消息,并使用 lavenshtein 距离算法将其与第二条消息进行比较,阈值为 70%。如果 2 条消息满足阈值,那么我应该将第二条消息添加到列表中,否则我应该将第二条消息添加到单独的列表中,并将键名保留为“G2”,依此类推。
解决方法
您可以使用 aggregateByKey
获取每个键的组合字符串列表:
val data = sc.textFile(inputFile)
val data2 = data.map(line => {val arr = line.split(","); (arr(0),arr(1))})
val result = data2.aggregateByKey(List[String]())(_ :+ _,_ ++ _)
// or to prepend rather than append,// val result = data2.aggregateByKey(List[String]())((x,y) => y :: x,_ ++ _)
如果你想要结果为 Map
,你可以这样做
val resultMap = result.toMap
,
我假设您正在尝试基于某个距离函数进行聚类,这可能就是您要查找的内容:
def isWithinThreshold(s1: String,s2: String): Boolean = ???
//2 sets are grouped when there exist elements in a both sets that are closed to each other
def combine(acc: Vector[Vector[String]],s: Vector[String]) = {
val (near,far) = acc.partition(_.exists(str => s.exists(isWithinThreshold(str,_))))
near.fold(s)(_ ++ _) +: far
}
val preClusteringGroups = grpData.values.map(_.toVector) //this is already pre-grouped with with the key from data2 (`arr(0)`)
val res = preClusteringGroups.aggregate(Vector.empty[Vector[String]])(combine,{ case (v1,v2) =>
(v1 ++ v2).foldLeft(Vector.empty[Vector[String]])(combine)
}).zipWithIndex.map { case (v,i) => s"G$i" -> v }.toMap //.mapValues(_.toList) if you actually need a list
preClusteringGroups
基于 grpData
,它已经按原始密钥预先分组,可能无法满足您的距离要求。如果是这种情况,请重新定义 preClusteringGroups
:
val preClusteringGroups = data2.values.map(Vector(_))