如何将元素添加到 Map 中,其中键是 String,值是 Scala

问题描述

我有 a text file,其中包含有关发件人和消息的信息。格式为发件人,消息。

我已将文件加载到 RDD 中并用“,”分割它们并创建了一个键值对,其中键是发送者,值是消息 RDD[(String,String)]

然后,我做了一个 groupByKey() 根据发件人对消息进行分组,我得到了一个 RDD[(String,Iterable[String])]

Array[(String,Iterable[String])] = Array((Key,CompactBuffer(value1,value2,value3,....)) 

现在,我想迭代值部分,并将值一个一个地存储到列表中,所以我创建了一个空 Map,其中键是字符串,值是 List[String]

首先我应该检查 Map 是否为空,如果它为空,那么我应该将第一个添加到 Map 中存在的 List 中。

以下是我尝试过但我做不到的,当我检查地图时,它显示无。

import org.apache.spark.{SparkConf,SparkContext}
import scala.collection.mutable.ListBuffer
object Demo{
  def main(args: Array[String]): Unit = {
  val conf = new SparkConf().setMaster("local").setAppName("My App")
  val sc = new SparkContext(conf)
  val inputFile = "D:\\MyData.txt"
  val data = sc.textFile(inputFile)
  val data2 = data.map(line => {val arr = line.split(","); 
   (arr(0),arr(1))})
  val grpData = data2.groupByKey()
  val myMap = scala.collection.mutable.Map.empty[String,List[String]]
  for(value <- grpData.values){
    val list = ListBuffer[String]()
    if(myMap.isEmpty){
      list += value
      myMap.put("G1",list.toList)
    }
  }
}

在for循环中,我给出了grpData.values,因为我只需要值部分。作为发件人,我不想要我文件中的任何密钥。我只是用它们根据发件人对消息进行分组,但在 Map[String,List[String]] 中,我的键应该是 Group1、Group2 等等。值是我将从 CompactBuffer 中一一获取的消息。

首先,我应该检查 Map 是否为空,如果为空,我应该将第一条消息添加到 Map 中存在的 List 中。键应为“Group1”,值应为应存储在 List[String] 中的消息。

对于第二次迭代,Map 不会为空然后条件会转到 else 部分,在 else 部分我应该使用 lavenshtein 距离算法来比较消息。这里第一条消息已经添加到列表中,现在我应该从 Map 获取第一条消息,并使用 lavenshtein 距离算法将其与第二条消息进行比较,阈值为 70%。如果 2 条消息满足阈值,那么我应该将第二条消息添加到列表中,否则我应该将第二条消息添加到单独的列表中,并将键名保留为“G2”,依此类推。

解决方法

您可以使用 aggregateByKey 获取每个键的组合字符串列表:

val data = sc.textFile(inputFile)
val data2 = data.map(line => {val arr = line.split(","); (arr(0),arr(1))})
val result = data2.aggregateByKey(List[String]())(_ :+ _,_ ++ _)

// or to prepend rather than append,// val result = data2.aggregateByKey(List[String]())((x,y) => y :: x,_ ++ _)

如果你想要结果为 Map,你可以这样做

val resultMap = result.toMap
,

我假设您正在尝试基于某个距离函数进行聚类,这可能就是您要查找的内容:

def isWithinThreshold(s1: String,s2: String): Boolean = ???

//2 sets are grouped when there exist elements in a both sets that are closed to each other
def combine(acc: Vector[Vector[String]],s: Vector[String]) = {
  val (near,far) = acc.partition(_.exists(str => s.exists(isWithinThreshold(str,_))))

  near.fold(s)(_ ++ _) +: far
}

val preClusteringGroups = grpData.values.map(_.toVector) //this is already pre-grouped with with the key from data2 (`arr(0)`)
val res = preClusteringGroups.aggregate(Vector.empty[Vector[String]])(combine,{ case (v1,v2) =>
  (v1 ++ v2).foldLeft(Vector.empty[Vector[String]])(combine)
}).zipWithIndex.map { case (v,i) => s"G$i" -> v }.toMap //.mapValues(_.toList) if you actually need a list

preClusteringGroups 基于 grpData,它已经按原始密钥预先分组,可能无法满足您的距离要求。如果是这种情况,请重新定义 preClusteringGroups

val preClusteringGroups = data2.values.map(Vector(_))