问题描述
我对Java有点陌生,因此我很乐意为您提供有关在Kafka谓词中处理多个条件的建议。我有以下代码,能够基于动态输入进行动态过滤,并避免出现以下“ if / elseifs”?我使代码简单而愚蠢,以便于更轻松地了解我要执行的操作。我正在尝试了解推荐/应用/添加由于用户输入而可能需要多少个过滤器的方法。我还想知道是否可以根据用户输入(等于/包含/ ...)使用比较运算符。
public Topology buildTopology(Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = envProps.getProperty("input.topic.name");
final String streamsOutputTopic = envProps.getProperty("streams.approved.topic.name");
final String tableOutputTopic = envProps.getProperty("table.output.topic.name");
final Serde<String> stringSerde = Serdes.String();
final KStream<String,ClientTask> stream = builder.stream(inputTopic,Consumed.with(stringSerde,StreamsSerdes.ClientTask()));
String[] Filters = {"State=Work In Progress","Priorit=High"};
final KStream<String,ClientTask> filter_stream = stream.filter(IsGoodtoGoFilter(Filters));
filter_stream.to(streamsOutputTopic,Produced.with(stringSerde,StreamsSerdes.ClientTask()));
return builder.build();
}
public static Predicate<String,ClientTask> IsGoodtoGoFilter(String[] Filters) {
ArrayList<String> GetColumn = new ArrayList<>();
ArrayList<String> GetValue = new ArrayList<>();
for (String FilterColumn: Filters) {
String[] ColumnAndValue = FilterColumn.split("=");
GetColumn.add(ColumnAndValue[0]);
GetValue.add(ColumnAndValue[1]);
}
if (GetColumn.size() == 1) {
return (k,v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0));
}
else if (GetColumn.size() == 2) {
return (k,v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0)) &&
v.get(GetColumn.get(1)).toString().equals(GetValue.get(1));
}
return (k,v) -> v.getStatus().equals("Open"); // replace by empty predicate later
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)