问题描述
我正在尝试在 coprocessFunction 中初始化一个“ListState”,但是它一直抛出这个错误 ""java.lang.NullPointerException: 键控状态只能用于'键控流'""。
这是我使用的代码 sinnpet
def double_char(str): # function double_char takes argument str
double_char = "" # create an empty string assign to variable double_char. In python you put "" or '' around text that you want as a string (text).
for i in range(len(str)): # iterate from 0 to length of string
double_char += str[i]*2 # append character at position i from str and duplicate it
return double_char # return double_char
抛出这个错误的那一行是那行
dataStream1.keyBy(ele->ele.f1).connect(dataStream2).process(
new coprocessFunction<Tuple2<Long,String>,String,Object>() {
ListState stream1ListState;
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("inside coprocess Function Constructor");
ListStateDescriptor<Tuple2<Long,String>> listStateDescriptor = new
ListStateDescriptor<Tuple2<Long,String>>("type1",Typeinformation.of(new TypeHint<Tuple2<Long,String>>(){}));
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement1(Tuple2<Long,String> longStringTuple2,Context context,Collector<Object> collector) throws Exception {
LOG.info("inside stream1 processor");
LOG.info(longStringTuple2.toString());
collector.collect(longStringTuple2);
}
@Override
public void processElement2(String s,Collector<Object> collector)
throws Exception {
LOG.info("inside stream2 processor");
LOG.info(s);
}
}).print();
其余的错误日志跟踪如下
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
解决方法
该错误意味着您只能在 ListState
内部使用 ValueState
、KeyedCoProcessFunction
等,而这反过来只能在两个流都被键入时使用。因此,如果第二个流是普通流并且可以通过某个键进行键控,那么您可以这样做,否则您可能需要参考 here 中描述的广播状态模式。