问题描述
我正在查询 127.0.1.1:9069 上的 flink 集群的代理服务器,但没有得到查询响应。我通过在 9000 端口上创建服务器来计算所有输入数字的总和。此外,我将总和存储在值状态中。
Flink 作业:
private transient ValueState<Tuple2<String,Long>> sum;
@Override
public void flatMap(Tuple2<Long,Long> input,Collector<Tuple2<String,Long>> out) throws Exception {
if (input.f1==-1){
sum.clear();
return;
}
Tuple2<String,Long> currentSum = sum.value();
currentSum.f1 += input.f1;
sum.update(currentSum);
System.out.println("Current Sum: "+(sum.value().f1)+"\nCurrent Count: "+(sum.value().f0));
out.collect(new Tuple2<>("sum",sum.value().f1));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<String,Long>> descriptor =
new ValueStateDescriptor<>(
"sum",// the state name
Typeinformation.of(new TypeHint<Tuple2<String,Long>>() {}),Tuple2.of("sum",0L)); // default value of the state,if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
inp.flatMap(new FlatMapFunction<String,Tuple2<Long,Long>>() {
@Override
public void flatMap(String inpstr,Collector<Tuple2<Long,Long>> out) throws Exception{
for (String word : inpstr.split("\\s")) {
try {
if(word.equals("quit")){
throw new QuitValueState( "Stoppping!!!",hostname,port);
}
if(word.equals("clear")){
word="-1";
}
out.collect(Tuple2.of(1L,Long.valueOf(word)));
}
catch ( NumberFormatException e) {
System.out.println("Enter valid number: "+e.getMessage());
}catch (QuitValueState ex){
System.out.println("Quitting!!!");
}
}
}
}).keyBy(0).flatMap(new StreamingJob())
.keyBy(0).asQueryableState("query-name");
在 flink 集群上,我可以在 127.0.1.1:9069 看到代理服务器
客户端:
public static void main(String[] args) throws IOException,InterruptedException,Exception {
QueryableStateClient client = new QueryableStateClient("127.0.1.1",9069);
System.out.println("Querying on "+args[0]);
JobID jobId = JobID.fromHexString(args[0]);
ValueStateDescriptor<Tuple2<String,Typeinformation.of(new TypeHint<Tuple2<String,Long>>() {
}));
CompletableFuture<ValueState<Tuple2<String,Long>>> resultFuture =
client.getKvstate(jobId,"query-name","sum",BasicTypeInfo.STRING_TYPE_INFO,descriptor);
System.out.println(resultFuture);
resultFuture.thenAccept(response -> {
try {
Tuple2<String,Long> res = response.value();
System.out.println("Queried sum value: " + res);
} catch (Exception e) {
e.printstacktrace();
}
System.out.println("Exiting future ...");
});
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)