如何为 avro 编写正确的编码/解码以实现模式演变?

问题描述

我需要将 Avro 对象转换为 Byte(用于将其保存到数据库中),然后再次将 Byte 转换为 Avro 对象。 我想将数据写入模式1并从模式2中读取数据,这两个模式一开始是相同的。 我遇到了一个问题,如果我向模式 2 添加了一个新列,那么我会收到一个错误。此外,如果我将模式 2 中的一列更改为另一个“名称”和另一个“默认值”,那么它将读取前一个“默认值”。我猜读者使用索引而不是定义的模式读取字节。非常感谢!

这是我执行 avro 字节传输的代码。

public class KafkaAvroProducerV1 {
    public static void main(String[] args) throws IOException {
        Customer.Builder customerBuilder = Customer.newBuilder();
        customerBuilder.setAge(30);
        customerBuilder.setFirstName("foo");
        customerBuilder.setLastName("foo");
        customerBuilder.setHeight(180f);
        customerBuilder.setWeight(90f);
        Customer customer = customerBuilder.build();
       
        DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<SpecificRecord>(
            customer.getSchema());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,null);
        writer.write(customer,encoder);
        encoder.flush();
        out.close();

        byte[] serializedBytes = out.toByteArray();
        System.out.println("Sending message in bytes : " + serializedBytes);

        # read data with schema 1 
        DatumReader<Customer> userDatumReader = new SpecificDatumReader<Customer>(Customer.class);
        Decoder decoder = DecoderFactory.get().binaryDecoder(serializedBytes,null);
        SpecificRecord datum = userDatumReader.read(null,decoder);
        System.out.println(datum);
        
        # read data with schema 2 
        DatumReader<Customer_two> user2DatumReader = new SpecificDatumReader<Customer_two>(Customer_two.class);
        Decoder decoder2 = DecoderFactory.get().binaryDecoder(serializedBytes,null);
        SpecificRecord datum2 = user2DatumReader.read(null,decoder2);
        System.out.println(datum2);
    }}

这是错误信息:

Exception in thread "main" java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at com.avro.outbox.KafkaAvroProducerV1.main(KafkaAvroProducerV1.java:78)

这是两个架构:

# schema 1
{
  "type": "record","namespace": "com.example","name": "Customer","fields": [
    { "name": "first_name","type": "string","doc": "First Name of Customer" },{ "name": "last_name","doc": "Last Name of Customer" },{ "name": "age","type": "int","doc": "Age at the time of registration" },{ "name": "height","type": "float","doc": "Height at the time of registration in cm" },{ "name": "weight","doc": "Weight at the time of registration in kg" },{ "name": "automated_email","type": "boolean","default": true,"doc": "Field indicating if the user is enrolled in marketing emails" }
  ]
}

# schema 2
{
  "type": "record","name": "Customer_two","doc": "Field indicating if the user is enrolled in marketing emails" },{ "name": "size","default": "size","doc": "size" }
  ]
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...