如何将运动流转换为新流?

问题描述

我有一个流结构,如下所示,我必须对其进行解码并使用 flink 作业将转换后的流处理为 kinesis。

{ “错误”:[ { “级别”:“错误”, “消息”:“错误:在枚举中找不到实例值(“类别”)“\n” } ],"failure_tstamp": "2021-02-22T18:30:06.276Z","line": "CwBkAAAADjE5Mi4xNDMuNTcuMj"

我试图读取流,但不确定如何在转换后将其转换为 strem 任何想法或想过如何做到这一点,当我将其设为 json 字符串但如何在流中进行时

private def loadProducerOrFail(config: JobConfig,jobName: String): 
FlinkKinesisProducer[Array[Byte]] = {
  KinesisProducer(config.output,() => new KinesisSerializer()) match {
    case Right(c) =>
      println("got the producer" + c)
      c
    case Left(e: Exception) =>
      throw new RuntimeException(
        s"Job $jobName Failed. Unable to connect to kinesis. Original error ${e.getMessage}",e
      )
  }
}

def main(args: Array[String]) {
 val env = StreamExecutionEnvironment.getExecutionEnvironment

val Config = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION,"us-east-1")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,"LATEST")  
lazy val inputStream: DataStream[Either[Array[Byte],Event]] = env.addSource {
loadConsumerOrFail(config,jobName)
}

inputStream.map{jsonstr =>
  val sJson = JsonMethods.parse((jsonstr.map(_.tochar)).mkString)
  val payloadJsonValue = sJson \ "line"
  implicit val formats = DefaultFormats
  val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"","")
  val payloadBytes = base64Decoder.decode(payvalue)

  val collectorPayload = new CollectorPayload
  thriftDeserializer.deserialize(collectorPayload,payloadBytes)

  val p = collectorPayload.toString.replace("\\","")

  val jval = p.substring(17,p.length()-1).replace(" ","").replace("\"","").split(",").asJson

  JsonMethods.compact(JsonMethods.render(jval)).addSink(loadProducerOrFail(config,jobName))
 }
 env.execute("Flink Kinesis")
 }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)