问题描述
我有 2 个架构:
Event.avsc:
{
"type": "record","namespace": "com.onemount.jobs.transform.schema.avro","name": "Event","fields": [
{
"name": "id","type": "string"
},{
"name": "mtp_interest_submit","type": ["null","InterestSubmitParam"],"default": null
}
]
}
InterestSubmitParam.avsc:
{
"type": "record","name": "InterestSubmitParam","fields": [
{
"name": "interest","type": {
"type": "array","items": "string"
}
}
]
}
我正在使用来自 Kafka Confluent(使用 specific.avro.reader=false
)的 Avro 消息,并且需要从 GenericRecord
转换为 ObjectNode
。结果如下:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf","mtp_interest_submit": {
"com.onemount.jobs.transform.schema.avro.InterestSubmitParam": {
"interest": [
"fashion","travel"
]
}
}
}
但我希望它应该是:
{
"id": "c8b76e58-9803-4c78-9f82-a185bda1cabf","mtp_interest_submit": {
"interest": [
"fashion","travel"
]
}
}
GenericRecord genericRecord = ...
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(genericRecord.getSchema(),outputStream);
writer.write(genericRecord,encoder);
encoder.flush();
return new String(outputStream.toByteArray(),StandardCharsets.UTF_8);
}
非常感谢!
解决方法
通过使用jackson-dataformat-avro
,问题已经解决:
ObjectMapper mapper = new ObjectMapper(new AvroFactory());
GenericRecord genericRecord = ...;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream,null);
writer.write(genericRecord,encoder);
encoder.flush();
byte[] bytes = outputStream.toByteArray();
return mapper.readerFor(ObjectNode.class)
.with(new AvroSchema(genericRecord.getSchema()))
.readValue(bytes);
}
pom.xml:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
<version>2.12.3</version>
</dependency>