问题描述
我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保留在 S3 中。
背景
官方 S3-Sink-Connector 支持 Parquet 输出格式,但是:
对于此连接器,您必须使用带有 ParquetFormat 的 AvroConverter、ProtobufConverter 或 JsonSchemaConverter。尝试使用 JsonConverter(有或没有模式)会导致 NullPointerException 和 StackOverflowException。
问题陈述
所以,我正在寻找一种方法来读取最初以 JSON 格式编写的 Kafka 主题的消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入 S3 连接器,该连接器将以 Parquet 格式写入 S3 .
或者,鉴于主要要求(获取Kafka消息,将它在 S3 中作为 Parquet 文件)。谢谢!
PS:很遗憾,我目前无法更改这些 Kafka 消息的原始写入方式(例如将 JSON Schema serialization 与 Schema Discovery 一起使用)。
解决方法
通常,您的数据需要有一个架构,因为 Parquet 需要它(S3 parquet writer 将其转换为 Avro 作为中间步骤)
您可以考虑使用接受架构的 this Connect transform,并尝试应用 JSON 架构 - see tests。由于这会返回一个 Struct
对象,因此您可以尝试使用 JsonSchemaConverter
作为接收器的一部分。
但是如果您只是将随机的 JSON 数据扔到一个没有任何一致字段或值的主题中,那么您将很难应用任何架构