初始化 Sate 会导致此错误“java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream'”

问题描述

我正在尝试在 coprocessFunction 中初始化一个“ListState”,但是它一直抛出这个错误 ""java.lang.NullPointerException: 键控状态只能用于'键控流'""。

这是我使用的代码 sinnpet

def double_char(str):            # function double_char takes argument str
    double_char = ""             # create an empty string assign to variable double_char. In python you put "" or '' around text that you want as a string (text).
    for i in range(len(str)):    # iterate from 0 to length of string
        double_char += str[i]*2  # append character at position i from str and duplicate it
    return double_char           # return double_char

抛出这个错误的那一行是那行

    dataStream1.keyBy(ele->ele.f1).connect(dataStream2).process(
        new coprocessFunction<Tuple2<Long,String>,String,Object>() {
            ListState stream1ListState;
            
            @Override
            public void open(Configuration parameters) throws Exception {
                LOG.info("inside coprocess Function Constructor");
                ListStateDescriptor<Tuple2<Long,String>> listStateDescriptor = new
                    ListStateDescriptor<Tuple2<Long,String>>("type1",Typeinformation.of(new TypeHint<Tuple2<Long,String>>(){}));
                stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
            }
            
            @Override
            public void processElement1(Tuple2<Long,String> longStringTuple2,Context context,Collector<Object> collector) throws Exception {
                LOG.info("inside stream1 processor");
                LOG.info(longStringTuple2.toString());
                collector.collect(longStringTuple2);
            }
            
            @Override
            public void processElement2(String s,Collector<Object> collector)
                throws Exception {
                LOG.info("inside stream2 processor");
                LOG.info(s);
            }
        }).print();

其余的错误日志跟踪如下

stream1ListState = getRuntimeContext().getListState(listStateDescriptor);

解决方法

该错误意味着您只能在 ListState 内部使用 ValueStateKeyedCoProcessFunction 等,而这反过来只能在两个流都被键入时使用。因此,如果第二个流是普通流并且可以通过某个键进行键控,那么您可以这样做,否则您可能需要参考 here 中描述的广播状态模式。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...