问题描述
我有一个流结构,如下所示,我必须对其进行解码并使用 flink 作业将转换后的流处理为 kinesis。
{ “错误”:[ { “级别”:“错误”, “消息”:“错误:在枚举中找不到实例值(“类别”)“\n” } ],"failure_tstamp": "2021-02-22T18:30:06.276Z","line": "CwBkAAAADjE5Mi4xNDMuNTcuMj"
我试图读取流,但不确定如何在转换后将其转换为 strem 任何想法或想过如何做到这一点,当我将其设为 json 字符串但如何在流中进行时
private def loadProducerOrFail(config: JobConfig,jobName: String):
FlinkKinesisProducer[Array[Byte]] = {
KinesisProducer(config.output,() => new KinesisSerializer()) match {
case Right(c) =>
println("got the producer" + c)
c
case Left(e: Exception) =>
throw new RuntimeException(
s"Job $jobName Failed. Unable to connect to kinesis. Original error ${e.getMessage}",e
)
}
}
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val Config = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION,"us-east-1")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,"LATEST")
lazy val inputStream: DataStream[Either[Array[Byte],Event]] = env.addSource {
loadConsumerOrFail(config,jobName)
}
inputStream.map{jsonstr =>
val sJson = JsonMethods.parse((jsonstr.map(_.tochar)).mkString)
val payloadJsonValue = sJson \ "line"
implicit val formats = DefaultFormats
val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"","")
val payloadBytes = base64Decoder.decode(payvalue)
val collectorPayload = new CollectorPayload
thriftDeserializer.deserialize(collectorPayload,payloadBytes)
val p = collectorPayload.toString.replace("\\","")
val jval = p.substring(17,p.length()-1).replace(" ","").replace("\"","").split(",").asJson
JsonMethods.compact(JsonMethods.render(jval)).addSink(loadProducerOrFail(config,jobName))
}
env.execute("Flink Kinesis")
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)