问题描述
我正在尝试使用Quarkus从主题中读取事件。该主题可以包含不同类型的事件。所有事件都使用AVRO格式,因此我有一个架构注册表,可以在其中读取事件的已删除架构。我使用avro-maven-plugin将架构编译为java类。
假设我们有两种具有以下模式的事件:
Event1
{
"field1": "string"
}
Event2
{
"field2": "string"
}
在我的应用程序中,我仅对其中之一感兴趣。
public class Consumer {
@Incoming("test-in")
public CompletionStage<Void> read(
IncomingKafkaRecord<String,Event1> data) {
System.out.println(data.getKey());
System.out.println(data.getPayload());
return data.ack();
}
}
此代码不但可以打印所有事件,而且不仅可以打印Event1
类型的事件。
当我尝试从事件data.getField1()
获取数据时,我得到了CastException
。
java.lang.classCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.test.Event1 (org.apache.avro.generic.GenericData$Record is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1aa7ecca; com.test.Event1 is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1144a55a)
有一种方法可以从主题中读取某些事件类型,其中几种事件类型适用于AVRO?
解决方法
一种可能的解决方案(有点麻烦)是使用SpecificRecord
作为有效负载类型,并在应用程序配置文件上为进入事件设置 specific.avro.reader=true
。
public class Consumer {
@Incoming("test-in")
public CompletionStage<Void> read(
IncomingKafkaRecord<String,SpecificRecord> data) {
String schemaFullName = data.getPayload().getSchema().getFullName();
if (schemaFullName.equals(Event1.class.getName())) {
System.out.println(((Event1) data.getPayload()).getField1());
}
return data.ack();
}
}
选择事件后,可以将SpecificRecord
强制转换为正确的已编译Avro事件类。