问题描述
我在 DataFlow 上运行的 Apache Beam 管道中使用 KafkaIO 无界源。以下配置对我有用
Map<String,Object> kafkaConsumerConfig = new HashMap<String,Object>() {{
put("auto.offset.reset","earliest");
put("group.id","my.group.id");
}};
p.apply(KafkaIO.<String,String>read()
.withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
.withConsumerConfigUpdates(kafkaConsumerConfig)
.withTopic("my.topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withMaxnumRecords(10)
.withoutMetadata())
// do something
现在我有一个 protobuf 定义我的主题中的消息,我想用它来转换 Java 对象中的 kafka 记录。
以下配置不起作用,需要编码器:
p.apply(KafkaIO.<String,Bytes>read()
.withBootstrapServers("ip1:9092,ip3:9092")
.withConsumerConfigUpdates(kafkaConsumerConfig)
.withTopic("my.topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(BytesDeserializer.class)
.withMaxnumRecords(10)
.withoutMetadata())
不幸的是,我无法找出正确的 Value Deserializer + Coder 组合,也无法在文档中找到类似的示例。您有在 Apache Beam 中使用 Protobuf 和 Kafka 源代码的工作示例吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)