使用Spark Streaming解析来自Kafka的Json数据,解析并存储到Kudu,java.lang.NullPointerException

问题描述

Kafka Json数据:不规则的JSON exp:{"request_body":[{"a":"a_value","b","b_value" ...}]}
自动实现将JSON的关键字作为表字段,然后自动添加与字段相关的功能,目前没有问题。只有将已处理的JSON字符串存储到kudu的事件表中的功能才会出现此问题

object KafkaToKudu {

    val kuduMasters: String = System.getProperty("kuduMasters","*.*.*.15:7051,*.*.*.16:7051,*.*.*.17:7051") //tDataAnalysis01:7051,tDataAnalysis02:7051,tDataAnalysis03:7051")
    val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas",1)
    var Topic = "MiTest"
    val spark = SparkSession.builder.appName("KuduSparkc").master("local[*]").getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext,Seconds(5))
    val Kuduclient = new KuduClient.KuduClientBuilder(kuduMasters).build()
    val logger = LoggerFactory.getLogger(KafkaToKudu.getClass)
    val kuduContextc = new KuduContext(kuduMasters,spark.sqlContext.sparkContext)

    import spark.implicits._

    def getNewJsonData(data:Map[String,Any]): String = {
    // proccessign data to datas (add Key "Id" )
    
    return datas
    }

    def createId(orid: String,dId: String): String = {
        return UUID.randomUUID().toString
    }
    import java.time.{Instant,ZoneId,ZonedDateTime}

    def main(args: Array[String]): Unit = {
        val kafkaParams = Map[String,Object](
        "bootstrap.servers" -> "*.*.*.15:9092,*.*.*.16:9092,*.*.*.17:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "separate_id_for_each_stream","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean)
        )

        val topics = Array(Topic)
        val kafkaStream = KafkaUtils.createDirectStream(
        ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)
        )
        kafkaStream.foreachRDD(rdd => {
        rdd.foreachPartition(partitions => {
            implicit val formats = Serialization.formats(NoTypeHints)

            partitions.foreach(row => {
            val rows = row.value()
            val ab = parse(rows).extract[Map[String,Any]].get("request_body")
            try {
                val ap = parse(ab.get.toString).extract[List[Map[String,Any]]]
                for (acc <- ap) {

                var datas = acc 
                val yps = datas.get("type")
                val tp = yps.get.asInstanceOf[String]

                tp match {
                    case "track" => {
                        val newdata:String=getNewJsonData(datas)
                   
                    try{
                        val sp =  SparkSession.builder.config(spark.sparkContext.getConf).getOrCreate().sqlContext
                        import sp.implicits._
                        val dd=sp.createDataset(Seq(newdata)) ///===**java.lang.NullPointerException**
                        val df= sp.read.json(dd)
                        kuduContextc.insertRows(df,"events")
                    }catch {
                        case e: Exception => {
                        println(e)
                        }}

                    }

                    case _ =>{
                        println(tp)
                    }
                }
                }
            } catch {
                case e: Exception => {
                println(e)
                }

            }

            })

        })
        })
        ssc.start()

        ssc.awaitTermination()
    }

}

代码运行到val dd=sp.createDataset(Seq(newdata))

我收到了 java.lang.NullPointerException

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)