问题描述
我正在尝试使用 avro 序列化将数据发送到 kafka 主题。整个想法是将 Json 数据转换为 avro 并发送到 kafka。我没有在两者之间使用任何 Pojo,只是直接处理 json 并转换为通用数据。我实现了一个自定义的 AvroSerializer。下面显示了教程 here 中使用的序列化程序代码。
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroSerializer.class);
@Override
public void close() {
// No-op
}
@Override
public void configure(Map<String,?> arg0,boolean arg1) {
// No-op
}
@Override
public byte[] serialize(String topic,T data) {
try {
byte[] result = null;
if (data != null) {
LOGGER.debug("data='{}'",data);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream,null);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.write(data,binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
LOGGER.debug("serialized data='{}'",DatatypeConverter.printHexBinary(result));
}
return result;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'",ex);
}
}
}
当我使用这个序列化器向 kafka 主题发送通用记录时,它会出现以下错误。
org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.specific.SpecificRecord
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) ~[avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) ~[avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) ~[avro-1.8.2.jar:1.8.2]
当我尝试使用以下代码将通用记录转换为特定记录时。
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(data.getSchema());
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out,null);
writer.write(data,encoder);
encoder.flush();
byte[] avroData = out.toByteArray();
out.close();
SpecificDatumReader<SpecificRecord> reader = new SpecificDatumReader<SpecificRecord>(SpecificRecord.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(avroData,null);
SpecificRecord myCustomrecord = reader.read(null,decoder);
2021-03-11 21:14:04.774 ERROR 10464 --- [nio-8088-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing Failed; nested exception is org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.specific.SpecificRecord] with root cause
org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.specific.SpecificRecord
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) ~[avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) ~[avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) ~[avro-1.8.2.jar:1.8.2]
属性如下所示。
Properties properties = new Properties();
// normal producer
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("acks","all");
properties.setProperty("retries","10");
// avro part
properties.setProperty("key.serializer",StringSerializer.class.getName());
properties.setProperty("value.serializer",AvroSerializer.class.getName());
有人可以建议一种方法来克服这个问题。我已经参考了大多数类似的 stackoverflow 问题,但似乎没有一个对我有用。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)