Confluent Kafka-to-S3 sink 自定义 s3 命​​名,便于分区

问题描述

我正在使用 confluent 的 kafka-connect-s3 https://www.confluent.io/hub/confluentinc/kafka-connect-s3 将我的 kafka 主题备份到 s3。我希望能够使用 Athena 轻松查询这些数据,并对其进行正确分区以实现廉价/快速读取。

我想按(年/月/日/主题元组进行分区。我已经通过使用每日分区器 https://docs.confluent.io/kafka-connect-s3-sink/current/index.html#partitioning-records-into-s3-objects 解决了年/月/日部分。现在 year=YYYY/month=MM/day=DD 已进入路径,因此任何基于 hive 的查询都会按时优化/分区。查看 msck 解释,注意使用 userid=

的示例

https://docs.aws.amazon.com/athena/latest/ug/msck-repair-table.html

但是,根据这些文档 https://docs.confluent.io/kafka-connect-s3-sink/current/index.html#s3-object-names,我在路径中获得了 {topic},但无法将其修改为 topic={topic}。我可以将其添加到前缀中(而不是 env={env} 前缀将是 env={env}/topic={topic}),但这对于它下面的另一个唯一子目录 {topic} 似乎是多余的。

我还注意到主题名称在以 + 分隔的消息名称中(以及分区和起始偏移量)。

我的问题。 . .如何在我的路径中获取 topic={topic} 以便基于配置单元的查询自动创建该分区?或者我是否已经通过将它放在路径中(没有主题 =)或在消息名称中(同样没有主题 =)来免费获得它

解决方法

如何在我的路径中获取 topic={topic} 以便基于 hive 的查询自动创建该分区?

没有。

建议是为每个主题制作一个分区表,而不是让主题本身成为一个分区