如何在不添加模式名称的情况下从 Avro GenericRecord 转换为 JSON?

问题描述

我有 2 个架构:

Event.avsc:

{
  "type": "record","namespace": "com.onemount.jobs.transform.schema.avro","name": "Event","fields": [
    {
      "name": "id","type": "string"
    },{
      "name": "mtp_interest_submit","type": ["null","InterestSubmitParam"],"default": null
    }
  ]
}

InterestSubmitParam.avsc:

{
  "type": "record","name": "InterestSubmitParam","fields": [
    {
      "name": "interest","type": {
        "type": "array","items": "string"
      }
    }
  ]
}

我正在使用来自 Kafka Confluent(使用 specific.avro.reader=false)的 Avro 消息,并且需要从 GenericRecord 转换为 ObjectNode。结果如下:

{
  "id": "c8b76e58-9803-4c78-9f82-a185bda1cabf","mtp_interest_submit": {
    "com.onemount.jobs.transform.schema.avro.InterestSubmitParam": {
      "interest": [
        "fashion","travel"
      ]
    }
  }
}

但我希望它应该是:

{
  "id": "c8b76e58-9803-4c78-9f82-a185bda1cabf","mtp_interest_submit": {
    "interest": [
      "fashion","travel"
    ]
  }
}

我该如何解决。这是我的转换器代码

GenericRecord genericRecord = ...
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(genericRecord.getSchema(),outputStream);
    writer.write(genericRecord,encoder);
    encoder.flush();

    return new String(outputStream.toByteArray(),StandardCharsets.UTF_8);
}

非常感谢!

解决方法

通过使用jackson-dataformat-avro,问题已经解决:

ObjectMapper mapper = new ObjectMapper(new AvroFactory());

GenericRecord genericRecord = ...;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(genericRecord.getSchema());
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream,null);
    writer.write(genericRecord,encoder);
    encoder.flush();

    byte[] bytes = outputStream.toByteArray();

    return mapper.readerFor(ObjectNode.class)
            .with(new AvroSchema(genericRecord.getSchema()))
            .readValue(bytes);
}

pom.xml:

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-avro</artifactId>
        <version>2.12.3</version>
    </dependency>