与Kafka,Quarkus和Avro相同主题中的几种事件类型

问题描述

我正在尝试使用Quarkus从主题中读取事件。该主题可以包含不同类型的事件。所有事件都使用AVRO格式,因此我有一个架构注册表,可以在其中读取事件的已删除架构。我使用avro-maven-plugin将架构编译为java类

假设我们有两种具有以下模式的事件:

Event1

{
   "field1": "string"
}

Event2

{
   "field2": "string"
}

在我的应用程序中,我仅对其中之一感兴趣。

 public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> read(
        IncomingKafkaRecord<String,Event1> data) {
        System.out.println(data.getKey());
        System.out.println(data.getPayload());
        return data.ack();
    }
}

代码不但可以打印所有事件,而且不仅可以打印Event1类型的事件。

当我尝试从事件data.getField1()获取数据时,我得到了CastException

java.lang.classCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.test.Event1 (org.apache.avro.generic.GenericData$Record is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1aa7ecca; com.test.Event1 is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1144a55a)

有一种方法可以从主题中读取某些事件类型,其中几种事件类型适用于AVRO?

解决方法

一种可能的解决方案(有点麻烦)是使用SpecificRecord作为有效负载类型,并在应用程序配置文件上为进入事件设置 specific.avro.reader=true

 public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> read(
        IncomingKafkaRecord<String,SpecificRecord> data) {
            String schemaFullName = data.getPayload().getSchema().getFullName();
            if (schemaFullName.equals(Event1.class.getName())) {
                System.out.println(((Event1) data.getPayload()).getField1());
            }
        return data.ack();
    }
}

选择事件后,可以将SpecificRecord强制转换为正确的已编译Avro事件类。