消费来自Kafka主题的Avro消息

问题描述

我正在尝试使用kafka主题中的avro消息。我没有该消息的架构。下面是我的代码段。

public CthKafkaClient(String topic) {
    super("CthKafkaConsumer",false);
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
    props.put(ConsumerConfig.GROUP_ID_CONfig,CLIENT_ID);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONfig,"http://localhost:8081");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,"org.apache.kafka.common.serialization.StringDeserializer");
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,"io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONfig,true);
        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

我可以连接到该主题,但无法提取消息,请帮助

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)