在Apache Flink中使用DataSet的collect函数的问题

问题描述

我正在尝试根据下图计算社交媒体中的关系的AdamicAdar指数。我使用apache flink-gelly lirbrarie设置了Edges,Vertices,Dataset和Graph。这是我的代码



    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.graph.Graph;
    import org.apache.flink.graph.library.similarity.AdamicAdar;
    import org.apache.flink.types.NullValue;
    import org.apache.flink.types.StringValue;
    
    import java.util.List;
    
    public class MyMain {
    
        public static void main(String[] args) {
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource> edgeDataSet = env.
                    readCsvFile(String.valueOf(MyMain.class.getResource("dataset/edges.csv"))).
                    types(StringValue.class,StringValue.class);
            Graph graph = Graph.fromTuple2DataSet(edgeDataSet,env);
    
            List list = null;
            try {
                list = graph.run(new AdamicAdar()).collect();
            } catch (Exception e) {
                e.printstacktrace();
            }
    
            System.out.println(list.get(0));
        }
    }

这是我得到的错误



    Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(Lorg/apache/flink/configuration/ReadableConfig;Lorg/apache/flink/configuration/ConfigOption;Lorg/apache/flink/util/function/FunctionWithException;)Ljava/util/List;
        at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:75)
        at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
        at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:98)
        at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:79)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:962)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:878)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:862)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
        at MyMain.main(MyMain.java:23)
    
    Process finished with exit code 1

这也是我使用的edges.csv文件的一部分:



    5 122
    5 156
    5 158
    5 169
    5 180
    5 187
    5 204
    5 213
    5 235
    5 315
    5 316
    6 89
    6 95
    6 147
    6 219
    6 319
    7 22

其中5 316表示顶点编号为5的顶点连接到顶点编号为216的顶点,这定义了边。

这是我的pom.xml文件 pom.xml

解决方法

NoSuchMethodError通常表示您的依赖项中存在版本不匹配的情况。看着您的pom.xml,您似乎已经了解到Flink相关库的不同版本。

顺便说一句,我还注意到您的.csv不是逗号分隔的。您可能要简化程序,并验证edgeDataset实际上是2元组而不是单个值。