问题描述
我正在尝试使用kafka主题中的avro消息。我没有该消息的架构。下面是我的代码段。
public CthKafkaClient(String topic) {
super("CthKafkaConsumer",false);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONfig,CLIENT_ID);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONfig,"http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONfig,true);
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)