在 flink 可查询状态 [version-1.7.2] 中没有得到查询响应

问题描述

我正在查询 127.0.1.1:9069 上的 flink 集群的代理服务器,但没有得到查询响应。我通过在 9000 端口上创建服务器来计算所有输入数字的总和。此外,我将总和存储在值状态中。

Flink 作业:

private transient ValueState<Tuple2<String,Long>> sum;

@Override
public void flatMap(Tuple2<Long,Long> input,Collector<Tuple2<String,Long>> out) throws Exception {
    if (input.f1==-1){
        sum.clear();
        return;
    }
    Tuple2<String,Long> currentSum = sum.value();
    currentSum.f1 += input.f1;


    sum.update(currentSum);
    System.out.println("Current Sum: "+(sum.value().f1)+"\nCurrent Count: "+(sum.value().f0));
        out.collect(new Tuple2<>("sum",sum.value().f1));
}

@Override
public void open(Configuration config) {
    ValueStateDescriptor<Tuple2<String,Long>> descriptor =
            new ValueStateDescriptor<>(
                    "sum",// the state name
                    Typeinformation.of(new TypeHint<Tuple2<String,Long>>() {}),Tuple2.of("sum",0L)); // default value of the state,if nothing was set
    sum = getRuntimeContext().getState(descriptor);

}

inp.flatMap(new FlatMapFunction<String,Tuple2<Long,Long>>() {
        @Override
        public void flatMap(String inpstr,Collector<Tuple2<Long,Long>> out) throws Exception{

            for (String word : inpstr.split("\\s")) {
                try {
                    if(word.equals("quit")){
                        throw new QuitValueState( "Stoppping!!!",hostname,port);
                    }
                    if(word.equals("clear")){
                        word="-1";
                    }
                    out.collect(Tuple2.of(1L,Long.valueOf(word)));
                }
                catch ( NumberFormatException e) {
                    System.out.println("Enter valid number: "+e.getMessage());
                }catch (QuitValueState ex){
                    System.out.println("Quitting!!!");
                }
            }
        }
    }).keyBy(0).flatMap(new StreamingJob())
            .keyBy(0).asQueryableState("query-name");

在 flink 集群上,我可以在 127.0.1.1:9069 看到代理服务器

客户端:

public static void main(String[] args) throws IOException,InterruptedException,Exception {
    QueryableStateClient client = new QueryableStateClient("127.0.1.1",9069);

    System.out.println("Querying on "+args[0]);
    JobID jobId = JobID.fromHexString(args[0]);
    ValueStateDescriptor<Tuple2<String,Typeinformation.of(new TypeHint<Tuple2<String,Long>>() {
                    }));


    CompletableFuture<ValueState<Tuple2<String,Long>>> resultFuture =
            client.getKvstate(jobId,"query-name","sum",BasicTypeInfo.STRING_TYPE_INFO,descriptor);
    System.out.println(resultFuture);
    resultFuture.thenAccept(response -> {
        try {
            Tuple2<String,Long> res = response.value();
            System.out.println("Queried sum value: " + res);
        } catch (Exception e) {
            e.printstacktrace();
        }
        System.out.println("Exiting future ...");
    });
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)