问题描述
我需要将 Avro 对象转换为 Byte(用于将其保存到数据库中),然后再次将 Byte 转换为 Avro 对象。 我想将数据写入模式1并从模式2中读取数据,这两个模式一开始是相同的。 我遇到了一个问题,如果我向模式 2 添加了一个新列,那么我会收到一个错误。此外,如果我将模式 2 中的一列更改为另一个“名称”和另一个“默认值”,那么它将读取前一个“默认值”。我猜读者使用索引而不是定义的模式读取字节。非常感谢!
这是我执行 avro 字节传输的代码。
public class KafkaAvroProducerV1 {
public static void main(String[] args) throws IOException {
Customer.Builder customerBuilder = Customer.newBuilder();
customerBuilder.setAge(30);
customerBuilder.setFirstName("foo");
customerBuilder.setLastName("foo");
customerBuilder.setHeight(180f);
customerBuilder.setWeight(90f);
Customer customer = customerBuilder.build();
DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<SpecificRecord>(
customer.getSchema());
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,null);
writer.write(customer,encoder);
encoder.flush();
out.close();
byte[] serializedBytes = out.toByteArray();
System.out.println("Sending message in bytes : " + serializedBytes);
# read data with schema 1
DatumReader<Customer> userDatumReader = new SpecificDatumReader<Customer>(Customer.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(serializedBytes,null);
SpecificRecord datum = userDatumReader.read(null,decoder);
System.out.println(datum);
# read data with schema 2
DatumReader<Customer_two> user2DatumReader = new SpecificDatumReader<Customer_two>(Customer_two.class);
Decoder decoder2 = DecoderFactory.get().binaryDecoder(serializedBytes,null);
SpecificRecord datum2 = user2DatumReader.read(null,decoder2);
System.out.println(datum2);
}}
这是错误信息:
Exception in thread "main" java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at com.avro.outbox.KafkaAvroProducerV1.main(KafkaAvroProducerV1.java:78)
这是两个架构:
# schema 1
{
"type": "record","namespace": "com.example","name": "Customer","fields": [
{ "name": "first_name","type": "string","doc": "First Name of Customer" },{ "name": "last_name","doc": "Last Name of Customer" },{ "name": "age","type": "int","doc": "Age at the time of registration" },{ "name": "height","type": "float","doc": "Height at the time of registration in cm" },{ "name": "weight","doc": "Weight at the time of registration in kg" },{ "name": "automated_email","type": "boolean","default": true,"doc": "Field indicating if the user is enrolled in marketing emails" }
]
}
# schema 2
{
"type": "record","name": "Customer_two","doc": "Field indicating if the user is enrolled in marketing emails" },{ "name": "size","default": "size","doc": "size" }
]
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)