使用Apache Camel进行多态Json编组

问题描述

我正在重构骆驼路线,以期变得更加通用。 (如果有任何可能的bean注入解决方案,我也会使用spring boot)

from(fromKafka)
                .routeId("Rest Models")
                .removeHeaders("*")
                .aggregate(new GroupedBodyAggregationStrategy())
                .constant(true)
                .completionTimeout(batchingInterval)
                .process(new listofJsonToJsonArray())
                .unmarshal().json(JsonLibrary.Jackson,InputArrayPojo.class)
                .enrich("seda:rest",mergeRestResult)

处理器listofJsonToJsonArray()接受kafka消息的json字符串表示形式,并以逗号分隔的所有内容通过外部的{[ ]}进行连接。

因此,InputArrayPojo.class是来自kafka的对象数组的包装。我需要将对象捆绑在一起,以便在扩展中最小化到REST接口。所包含的对象的格式为InputPojo.class(实际上只是一个模式,还执行一些基本的数据质量检查)

我需要一种生成InputPojo.class的方法,以便对于我们的新工作,我们可以运行相同的路线,但是提供不同的InputPojo.class。

我尝试应用多态性并为InputPojo创建一个接口,但是在尝试构造该接口时会遇到错误

@JsonSubTypes({
        @JsonSubTypes.Type(value=InputPojo.class,name = "online")
})
public interface InputPojoInterface {
}

我也尝试了一些参数化,但是我也没有运气,因为它不会应用bean的构造函数,因此不存在任何方法

我也包括

com.fasterxml.jackson.databind.exc.InvalidDeFinitionException - Cannot construct instance of `InputPojoInterface` (no Creators,like default construct,exist): abstract types either need to be mapped to concrete types,have custom deserializer,or contain additional type @R_219_4045@ion
 at [Source: (ByteArrayInputStream); line: 1,column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
        "data"
})
public class InputArrayPojo{

    @JsonProperty("data")
    private List<InputPojo> data = null;
    @JsonIgnore
    private Map<String,Object> additionalProperties = new HashMap<String,Object>();

    @JsonProperty("data")
    public List<InputPojo> getData() {
        return data;
    }

    @JsonProperty("data")
    public void setData(List<InputPojo> data) {
        this.data = data;
    }

    @JsonAnyGetter
    public Map<String,Object> getAdditionalProperties() {
        return this.additionalProperties;
    }

    @JsonAnySetter
    public void setAdditionalProperty(String name,Object value) {
        this.additionalProperties.put(name,value);
    }

}

扩充还需要实现某种类型的生成逻辑

@Override
    public Exchange aggregate(Exchange oldExchange,Exchange newExchange) {

        List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
        List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
        List<ModelResultPojo> outputList = new ArrayList<>();

        for (int i = 0; i < originalMessages.size(); ++i) {
            ModelResultPojo output = new ModelResultPojo();
            IngestionOutPojo raw = originalMessages.get(i);
            PredictionPojo enrich = enrichmentMessages.get(i);
            /*
            enrichment logic to create modelResult
            */
            outputList.add(modelResult)
    }
    newExchange.getIn().setBody(outputList);
    return newExchange
}

解决方法

我最终通过以下方法提出了解决方案:

解组为默认类型:Map (不指定类,它骆驼解组为Map

之后,我编写了一个实现处理器的抽象类。在此处理器中,我获取Map,然后将抽象的editFields()函数应用于Map。

因此,我现在可以通过Map而不是POJO对业务逻辑进行多态处理。