如何使用 Spark JavaRDD 将列拆分为多行

问题描述

加载数据:

SparkConf sc= new SparkConf().setAppName("TEST").setMaster("local[*]");
JavaSparkContext JSC = new JavaSparkContext(sc);

JavaRDD<String> strinGrdDVotes = JSC.textFile("HarryPotter.csv");

我目前将此表加载到 RDD 中:

ID A B 姓名
1 23 50 哈利;波特

我想把它转换成下表:

ID A B 姓名
1 23 50 哈利
1 23 50 波特

我发现的所有解决方案都使用了我无法使用的 Sparksql,那么我如何仅使用 flatMapmapToPair 之类的东西来获得这个结果。

可能是这样的吗?

flatMap(s -> Arrays.asList(s.split(";")).iterator())

上面的代码产生这个:

ID A B 姓名
1 23 50 哈利
波特

我知道在 Scala 中可以这样做,但我不知道如何使用 java:

val input: RDD[String] = sc.parallelize(Seq("1,23,50,Harry;Potter"))
val csv: RDD[Array[String]] = input.map(_.split(','))

val result = csv.flatMap { case Array(s1,s2,s3,s4) => s4.split(";").map(part => (s1,part)) }

解决方法

第一部分从Scala转换到Java非常简单,你只需要用map把每一行用逗号分开就可以得到一个JavaRDD<String[]>。然后使用flatMap,对于每一行,将Name对应的数组的最后一部分进行拆分,使用java流,可以将names列表的每个元素转化为一个新的列表。

这是一个完整的例子:

JavaRDD<String> input = JSC.parallelize(
        Arrays.asList("1,23,50,Harry;Potter","2,24,60,Hermione;Granger")
);

JavaRDD<String[]> result = input.map(line -> line.split(","))
        .flatMap(r -> {
            List<String> names = Arrays.asList(r[3].split(";"));

            String[][] values = names.stream()
                    .map(name -> new String[]{r[0],r[1],r[2],name})
                    .toArray(String[][]::new);

            return Arrays.asList(values).iterator();
        });

// print the result RDD
for (String[] line : result.collect()) {
    System.out.println(Arrays.toString(line));
}
// [1,Harry]
// [1,Potter]
// [2,Hermione]
// [2,Granger]