使用合并函数将 RDD 保存为 csv 文件

问题描述

我正在尝试在 Intellij 中使用 Apache Spark 流式传输 Twitter 数据,但是当我使用函数 coalesce 时,它说它无法解析符号合并。这是我的主要代码

    val spark = SparkSession.builder().appName("twitterStream").master("local[*]").getorCreate()
    import spark.implicits._
    val sc: SparkContext = spark.sparkContext
    val streamContext = new StreamingContext(sc,Seconds(5))

    val filters = Array("Singapore")
    val filtered = TwitterUtils.createStream(streamContext,None,filters)
    val englishTweets = filtered.filter(_.getLang() == "en")

    //englishTweets.print()

    englishTweets.foreachRDD{rdd =>
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getorCreate()
      import spark.implicits._
      val tweets = rdd.map( field =>
        (
          field.getId,field.getUser.getScreenName,field.getCreatedAt.toInstant.toString,field.getText.toLowerCase.split(" ").filter(_.matches("^[a-zA-Z0-9 ]+$")).fold("")((a,b) => a + " " + b).trim,sentiment(field.getText)
        )
      )
      val tweetsdf = tweets.toDF("userID","user","createdAt","text","sentimentType")
      tweetsdf.printSchema()
      tweetsdf.show(false)
    }.coalesce(1).write.csv("hdfs://localhost:9000/usr/sparkApp/test/testing.csv")

解决方法

我已经尝试过使用我自己的数据集,并且我已经阅读了一个数据集,并且在编写时我已经应用了合并函数并且它给出了结果,请参考这个它可能对你有帮助。

import org.apache.spark.sql.SparkSession
import com.spark.Rdd.DriverProgram
import org.apache.log4j.{ Logger,Level }
import org.apache.spark.sql.SaveMode
import java.sql.Date

object JsonDataDF {
  
  System.setProperty("hadoop.home.dir","C:\\hadoop");
  System.setProperty("hadoop.home.dir","C:\\hadoop"); // This is the system property which is useful to find the winutils.exe
  Logger.getLogger("org").setLevel(Level.WARN) // This will remove Logs

  case class AOK(appDate:Date,arr:String,base:String,Comments:String)
  
  val dp = new DriverProgram
  val spark = dp.getSparkSession()
  
  def main(args : Array[String]): Unit = {
    
    import spark.implicits._
    val jsonDf = spark.read.option("multiline","true").json("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JSONdata.txt").as[AOK]
    
    jsonDf.coalesce(1)  // Refer Here
          .write
          .mode(SaveMode.Overwrite)
          .option("header","true")
          .format("csv")
          .save("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JsonToCsv")
  }
  
}