问题描述
我正在重构骆驼路线,以期变得更加通用。 (如果有任何可能的bean注入解决方案,我也会使用spring boot)
from(fromKafka)
.routeId("Rest Models")
.removeHeaders("*")
.aggregate(new GroupedBodyAggregationStrategy())
.constant(true)
.completionTimeout(batchingInterval)
.process(new listofJsonToJsonArray())
.unmarshal().json(JsonLibrary.Jackson,InputArrayPojo.class)
.enrich("seda:rest",mergeRestResult)
处理器listofJsonToJsonArray()
接受kafka消息的json字符串表示形式,并以逗号分隔的所有内容通过外部的{[ ]}
进行连接。
因此,InputArrayPojo.class是来自kafka的对象数组的包装。我需要将对象捆绑在一起,以便在扩展中最小化到REST接口。所包含的对象的格式为InputPojo.class(实际上只是一个模式,还执行一些基本的数据质量检查)
我需要一种生成InputPojo.class的方法,以便对于我们的新工作,我们可以运行相同的路线,但是提供不同的InputPojo.class。
我尝试应用多态性并为InputPojo创建一个接口,但是在尝试构造该接口时会遇到错误。
@JsonSubTypes({
@JsonSubTypes.Type(value=InputPojo.class,name = "online")
})
public interface InputPojoInterface {
}
我也尝试了一些参数化,但是我也没有运气,因为它不会应用bean的构造函数,因此不存在任何方法。
我也包括了
com.fasterxml.jackson.databind.exc.InvalidDeFinitionException - Cannot construct instance of `InputPojoInterface` (no Creators,like default construct,exist): abstract types either need to be mapped to concrete types,have custom deserializer,or contain additional type @R_219_4045@ion
at [Source: (ByteArrayInputStream); line: 1,column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"data"
})
public class InputArrayPojo{
@JsonProperty("data")
private List<InputPojo> data = null;
@JsonIgnore
private Map<String,Object> additionalProperties = new HashMap<String,Object>();
@JsonProperty("data")
public List<InputPojo> getData() {
return data;
}
@JsonProperty("data")
public void setData(List<InputPojo> data) {
this.data = data;
}
@JsonAnyGetter
public Map<String,Object> getAdditionalProperties() {
return this.additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String name,Object value) {
this.additionalProperties.put(name,value);
}
}
扩充还需要实现某种类型的生成逻辑
@Override
public Exchange aggregate(Exchange oldExchange,Exchange newExchange) {
List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
List<ModelResultPojo> outputList = new ArrayList<>();
for (int i = 0; i < originalMessages.size(); ++i) {
ModelResultPojo output = new ModelResultPojo();
IngestionOutPojo raw = originalMessages.get(i);
PredictionPojo enrich = enrichmentMessages.get(i);
/*
enrichment logic to create modelResult
*/
outputList.add(modelResult)
}
newExchange.getIn().setBody(outputList);
return newExchange
}
解决方法
我最终通过以下方法提出了解决方案:
解组为默认类型:Map
之后,我编写了一个实现处理器的抽象类。在此处理器中,我获取Map,然后将抽象的editFields()函数应用于Map。
因此,我现在可以通过Map而不是POJO对业务逻辑进行多态处理。