Spark Text文件流分析

问题描述

我是火花流的新手。我想分析从不同应用程序主机复制到 HDFS 公共目标位置的文本文件。我得到了空白数据框 :( 没有获取记录。 XML 记录获取逻辑是正确的,我已经在控制台 RDD[String] 上测试过,但看起来像 DStream[String] 的一些问题。请问有人可以帮忙吗?


    package com.sparkstreaming.loganalysis
    import org.apache.spark._
    import org.apache.spark.storage._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.receiver._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import scala.xml.XML
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.sqlContext
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.sqlContext
    class addStreaming(sc: SparkContext,sqlContext: sqlContext,cpDir: String) {
      
      def creatingFunc(): StreamingContext = {
        val batchInterval = Seconds(4)
        val ssc = new StreamingContext(sc,batchInterval)
        // Set the active sqlContext so that we can access it statically within the foreachRDD
        sqlContext.setActive(sqlContext)
        ssc.checkpoint(cpDir)
        val streamRDD = ssc.textFileStream("/home/dataflair/SparkProjects/SparkStreamingProject/src/main/resources/")
            
       
        val resultRDD = streamRDD
        .map(scala.xml.XML.loadString _).map(x => {
          val dt = (x \ "RECORD" \ "DATE").text
          val host = (x \ "RECORD" \ "HOST").text
          val ip = (x \ "RECORD" \ "IP").text
          (dt,host,ip)
        })
        
     
        resultRDD.foreachRDD(row => {
            val spark = SparkSession.builder.config(row.sparkContext.getConf).getorCreate()
            import spark.implicits._
            
            val streamDF = row.toDF("DATE","HOST","IP")
            
            streamDF.show(false)
            
        }) 
        
         ssc
      }
    }
    object textStreamAnalysis {
      
      def main(args: Array[String]) {
      
       val cpDir = "/tmp/checkpoint"
       val conf = new SparkConf().setMaster("local[*]").setAppName("textSTream")
       val sc = new SparkContext(conf)
       val sqlContext = new sqlContext(sc)
       val addStr = new addStreaming(sc,sqlContext,cpDir)   
       val xsc = StreamingContext.getActiveOrCreate(cpDir,addStr.creatingFunc _)
       xsc.start()
       xsc.awaitTermination()
      }
    }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)