反序列化交易一致的消费者消息

问题描述

该问题也与我之前提出的问题有关。 IIDR CDC with Transaction Details to Kafka 仍未确定。

事务一致的使用者Api的示例AvroConsole在控制台中输出消息,并使用KafkaDeserializer将byte []反序列化为Object 示例TCC控制台链接https://www.ibm.com/support/knowledgecenter/SSTRGZ_11.4.0/com.ibm.cdcdoc.cdckafka.doc/tasks/kafkatccdev.html

我们尝试使用Java KafkaConsumer类执行相同的操作,并且能够使用byte []解串器将其打印出来。 ConsumerRecord键和consumerRecord值仍为序列化格式。

示例代码在下面

final Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONfig,APPLICATION_ID + "-" + UUID.randomUUID().toString());
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONfig,BOOTSTRAP_SERVER);
prop.put("schema.registry.url",SCHEMA_REGISTRY_URL);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
prop.setProperty(ConsumerConfig.GROUP_ID_CONfig,GROUP_ID + "-" + UUID.randomUUID().toString());
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,ByteArrayDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,ByteArrayDeserializer.class.getName());


consumer = new KafkaConsumer<byte[],byte[]>(properties);
consumer.subscribe(Arrays.asList(TCC_TOPIC));
consumer.seekToBeginning(consumer.assignment());

Map<String,Object> keyDeserializerConfig = new HashMap<String,Object>();
keyDeserializerConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONfig,StreamConfigurator.SCHEMA_REGISTRY_URL);
keyDeserializerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONfig,"false");
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer ();
keyDeserializer.configure(keyDeserializerConfig,true);

Map<String,Object> valueDeserializerConfig = new HashMap<String,Object>();
valueDeserializerConfig.putAll(keyDeserializerConfig);
valueDeserializerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONfig,"false");
KafkaAvroDeserializer valueDeserializer = new KafkaAvroDeserializer ();
valueDeserializer.configure(valueDeserializerConfig,false);


while (true) {
    ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<byte[],byte[]> record : records)
    {
        System.out.println(record.topic());
        System.out.println(record.partition());
        System.out.println(keyDeserializer.deserialize("key",record.key()));
        System.out.println(valueDeserializer.deserialize("value",record.value()));
        System.out.println(record.toString());
    }
}

我们在下面的代码行中遇到运行时错误

System.out.println(keyDeserializer.deserialize(“ key”,record.key()));

错误是 线程“主”中的异常org.apache.kafka.common.errors.SerializationException:反序列化ID -1的Avro消息时出错 引起原因:org.apache.kafka.common.errors.SerializationException:未知的魔术字节!

任何帮助将不胜感激。没有很多更好的文档或样本来为交易一致的消费者主题编写消费者代码

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)