Kafka流-基于条件数量的动态连接谓词

问题描述

我对Java有点陌生,因此我很乐意为您提供有关在Kafka谓词中处理多个条件的建议。我有以下代码,能够基于动态输入进行动态过滤,并避免出现以下“ if / elseifs”?我使代码简单而愚蠢,以便于更轻松地了解我要执行的操作。我正在尝试了解推荐/应用/添加由于用户输入而可能需要多少个过滤器的方法。我还想知道是否可以根据用户输入(等于/包含/ ...)使用比较运算符。

    public Topology buildTopology(Properties envProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = envProps.getProperty("input.topic.name");
        final String streamsOutputTopic = envProps.getProperty("streams.approved.topic.name");
        final String tableOutputTopic = envProps.getProperty("table.output.topic.name");
        final Serde<String> stringSerde = Serdes.String();
        final KStream<String,ClientTask> stream = builder.stream(inputTopic,Consumed.with(stringSerde,StreamsSerdes.ClientTask()));    
        String[] Filters = {"State=Work In Progress","Priorit=High"};
        final KStream<String,ClientTask> filter_stream = stream.filter(IsGoodtoGoFilter(Filters));
        filter_stream.to(streamsOutputTopic,Produced.with(stringSerde,StreamsSerdes.ClientTask()));           
        return builder.build();
    } 
    public static Predicate<String,ClientTask> IsGoodtoGoFilter(String[] Filters) {
        ArrayList<String> GetColumn = new ArrayList<>();
        ArrayList<String> GetValue = new ArrayList<>();
        for (String FilterColumn: Filters)       {
            String[] ColumnAndValue = FilterColumn.split("=");
            GetColumn.add(ColumnAndValue[0]);
            GetValue.add(ColumnAndValue[1]);
        }
        if (GetColumn.size() == 1) {
            return (k,v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0));
        }
        else if (GetColumn.size() == 2) {
            return (k,v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0)) &&
            v.get(GetColumn.get(1)).toString().equals(GetValue.get(1));
        }
        return (k,v) -> v.getStatus().equals("Open"); // replace by empty predicate later
    }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)