Kafka Connect:读取JSON序列化的Kafka消息,转换为Parquet格式并在S3中持久化

问题描述

我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保留在 S3 中。

背景

官方 S3-Sink-Connector 支持 Parquet 输出格式,但是:

对于此连接器,您必须使用带有 ParquetFormat 的 AvroConverter、ProtobufConverter 或 JsonSchemaConverter。尝试使用 JsonConverter(有或没有模式)会导致 NullPointerException 和 StackOverflowException。

还有JsonSchemaConverter throws out an error if the message was not written using JSON Schema serialization

问题陈述

所以,我正在寻找一种方法来读取最初以 JSON 格式编写的 Kafka 主题的消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入 S3 连接器,该连接器将以 Parquet 格式写入 S3 .

或者,鉴于主要要求(获取Kafka消息,将它在 S3 中作为 Parquet 文件)。谢谢!

PS:很遗憾,我目前无法更改这些 Kafka 消息的原始写入方式(例如将 JSON Schema serializationSchema Discovery 一起使用)。

解决方法

通常,您的数据需要有一个架构,因为 Parquet 需要它(S3 parquet writer 将其转换为 Avro 作为中间步骤)

您可以考虑使用接受架构的 this Connect transform,并尝试应用 JSON 架构 - see tests。由于这会返回一个 Struct 对象,因此您可以尝试使用 JsonSchemaConverter 作为接收器的一部分。

但是如果您只是将随机的 JSON 数据扔到一个没有任何一致字段或值的主题中,那么您将很难应用任何架构