如何在 Apache Beam 中使用 protobuf 定义使用 Kafka 消息?

问题描述

我在 DataFlow 上运行的 Apache Beam 管道中使用 KafkaIO 无界源。以下配置对我有

Map<String,Object> kafkaConsumerConfig  = new HashMap<String,Object>() {{
  put("auto.offset.reset","earliest");
  put("group.id","my.group.id");
}};

p.apply(KafkaIO.<String,String>read()
        .withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
        .withConsumerConfigUpdates(kafkaConsumerConfig)
        .withTopic("my.topic")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withMaxnumRecords(10)
        .withoutMetadata())
// do something

现在我有一个 protobuf 定义我的主题中的消息,我想用它来转换 Java 对象中的 kafka 记录。

以下配置不起作用,需要编码器:

 p.apply(KafkaIO.<String,Bytes>read()
        .withBootstrapServers("ip1:9092,ip3:9092")
        .withConsumerConfigUpdates(kafkaConsumerConfig)
        .withTopic("my.topic")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(BytesDeserializer.class)
        .withMaxnumRecords(10)
        .withoutMetadata())

不幸的是,我无法找出正确的 Value Deserializer + Coder 组合,也无法在文档中找到类似的示例。您有在 Apache Beam 中使用 Protobuf 和 Kafka 源代码的工作示例吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)