问题描述
我是火花流的新手。我想分析从不同应用程序主机复制到 HDFS 公共目标位置的文本文件。我得到了空白数据框 :( 没有获取记录。 XML 记录获取逻辑是正确的,我已经在控制台 RDD[String] 上测试过,但看起来像 DStream[String] 的一些问题。请问有人可以帮忙吗?
package com.sparkstreaming.loganalysis
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import scala.xml.XML
import org.apache.spark.SparkContext
import org.apache.spark.sql.sqlContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.sqlContext
class addStreaming(sc: SparkContext,sqlContext: sqlContext,cpDir: String) {
def creatingFunc(): StreamingContext = {
val batchInterval = Seconds(4)
val ssc = new StreamingContext(sc,batchInterval)
// Set the active sqlContext so that we can access it statically within the foreachRDD
sqlContext.setActive(sqlContext)
ssc.checkpoint(cpDir)
val streamRDD = ssc.textFileStream("/home/dataflair/SparkProjects/SparkStreamingProject/src/main/resources/")
val resultRDD = streamRDD
.map(scala.xml.XML.loadString _).map(x => {
val dt = (x \ "RECORD" \ "DATE").text
val host = (x \ "RECORD" \ "HOST").text
val ip = (x \ "RECORD" \ "IP").text
(dt,host,ip)
})
resultRDD.foreachRDD(row => {
val spark = SparkSession.builder.config(row.sparkContext.getConf).getorCreate()
import spark.implicits._
val streamDF = row.toDF("DATE","HOST","IP")
streamDF.show(false)
})
ssc
}
}
object textStreamAnalysis {
def main(args: Array[String]) {
val cpDir = "/tmp/checkpoint"
val conf = new SparkConf().setMaster("local[*]").setAppName("textSTream")
val sc = new SparkContext(conf)
val sqlContext = new sqlContext(sc)
val addStr = new addStreaming(sc,sqlContext,cpDir)
val xsc = StreamingContext.getActiveOrCreate(cpDir,addStr.creatingFunc _)
xsc.start()
xsc.awaitTermination()
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)