如何使用 Scala 将字符串作为 json 沉入 flink kinesis 流中?

问题描述

我们如何将 pojo 变量作为 json 生成到 kinesis flink 流中:

val inputStream: DataStream[Array[Byte]] = env.addSource {
  loadConsumerOrFail(config,jobName)
}
inputStream.print()
val transformedStream: DataStream[String] = inputStream.map { jsonstr =>
  val sJson = JsonMethods.parse((jsonstr.map(_.tochar)).mkString)
  val payloadJsonValue = sJson \ "line"

  implicit val formats = DefaultFormats
  val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"","")
  val payloadBytes = base64Decoder.decode(payvalue)

  val collectorPayload  = new CollectorPayload
  thriftDeserializer.deserialize(collectorPayload,payloadBytes)

  badStream(collectorPayload.ipAddress,collectorPayload.userAgent,collectorPayload.timestamp,collectorPayload.refererUri,collectorPayload.hostname,(sJson \ "failure_tstamp").extract[String],collectorPayload.body,collectorPayload.toString)

}
transformedStream.addSink(loadProducerOrFail(config,jobName))

这里将transformedStream沉入另一个kinesis但作为json但如何转换为json

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)