JMSConnection序列化失败

问题描述

当前,我正在构建一个应用程序,该应用程序在Kafka Topic中读取消息(json中的事务)并在生产时发送给IBM MQ。我在JMS类中的序列化遇到了一些麻烦,并且在如何解决它方面有点迷失了。 我的代码是:

object DispatcherMqApp extends Serializable {

  private val logger = LoggerFactory.getLogger(this.getClass)
  val config = ConfigFactory.load()

  def inicialize(transactionType: String) = {

    val spark = new SparkConf()
      .setAppName("Dispatcher MQ Categorization")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .set("spark.streaming.stopGracefullyOnShutDown","true")
    logger.debug(s"Loading configuration at ${printConfig(config).head} =>\n${printConfig(config)(1)}")

    val kafkaConfig = KafkaConfig.buildFromConfiguration(config,"dispatcher-mq")
    val streamCtx = new StreamingContext(spark,Seconds(kafkaConfig.streamingInterval))

    sys.ShutdownHookThread {
      logger.warn("Stopping the application ...")
      streamCtx.stop(stopSparkContext = true,stopGracefully = true)
      logger.warn("Application Finish with Success !!!")
    }

    val topic = config.getString(s"conf.dispatcher-mq.consumer-topic.$transactionType")
    logger.info(s"Topic: $topic")
    val zkdir = s"${kafkaConfig.zookeeperBaseDir}$transactionType-$topic"
    val kafkaManager = new KafkaManager(kafkaConfig)
    val stream = kafkaManager.createStreaming(streamCtx,kafkaConfig.offset,topic,zkdir)
    val kafkaSink = streamCtx.sparkContext.broadcast(kafkaManager.createProducer())
    val mqConfig = MQConfig(config.getString("conf.mq.name"),config.getString("conf.mq.host"),config.getInt("conf.mq.port"),config.getString("conf.mq.channel"),config.getString("conf.mq.queue-manager"),config.getInt("conf.mq.retries"),config.getString("conf.mq.app-name"),Try(config.getString("conf.mq.user")).toOption,Try(config.getString("conf.mq.password")).toOption,config.getString("conf.dispatcher-mq.send.category_file"))

    val queueConn = new MQService(mqConfig)

    (stream,queueConn,streamCtx,kafkaSink,zkdir)
  }

  def main(args: Array[String]): Unit = {
    val transactionType = args.head
    if (transactionType=="account" | transactionType=="credit") {
      val (messages,sc,zkdir) = inicialize(transactionType)
      val fieldsType = config.getString(s"conf.dispatcher-mq.send.fields.$transactionType")
      val source = config.getString("conf.dispatcher-mq.parameters.source")
      val mqVersion = config.getString(s"conf.dispatcher-mq.parameters.version.$transactionType")
      val topicError = config.getString("conf.kafka.topic_error")

      messages.foreachRDD(rdd => {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map(_._2).filter(_.toUpperCase.contains("BYCATEGORIZER"))
          .foreach(message => {
            val msg:Option[TextMessage] = try {
              Some(queueConn.createOutputMq(message,fieldsType,source,mqVersion))
            } catch {
              case ex: Exception =>
                logger.error(s"[ERROR] input: [[$message]]\n$ex")
                val errorReport = ErrorReport("GENERAL","DISPATCHER-MQ",transactionType.toString,ex.getMessage,None,Option(ex.toString))
                ErrorReportService.sendError(errorReport,topicError,kafkaSink.value)
                None
            }
            if(msg.nonEmpty) queueConn.submit(msg.get)
          })
        logger.info(s"Save Offset in  $zkdir...\n${offsetRanges.toList.to}")
        ZookeeperConn.saveOffsets(zkdir,offsetRanges)
      })


      sc.start()
      sc.awaitTermination()

    } else
      logger.error(s"${args.head} is not a valid argument. ( account or credit ) !!! ")
  }

我在对JMSConnection进行序列化时出错,该方法在createOutputMq方法中被称为隐藏。错误是:

20/09/04 17:21:00 ERROR JobScheduler: Error running job streaming job 1599250860000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:80)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:76)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
20/09/04 17:21:00 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:80)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:76)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more

有人知道如何解决它吗?错误消息(76和80​​)中显示的行分别是我的messages.foreachRDD(rdd => {.foreach(message => {。 预先感谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...