org.apache.http.protocol.HttpRequestExecutor@6ea2bc93 不可序列化对象可能包含或引用不可序列化的字段

问题描述

flink消耗kafka,下沉到clickhouse

dataStreamSource.addSink(sinkInstance); 处的错误

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: org.apache.http.protocol.HttpRequestExecutor@6ea2bc93 is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
    at org.data.dataflow.KafkaToFlink.main(KafkaToFlink.java:36)
Caused by: java.io.NotSerializableException: org.apache.http.protocol.HttpRequestExecutor
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
    ... 11 more
public class KafkaToFlink {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
        .getExecutionEnvironment().setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers","10.227.89.202:9092");
    properties.setProperty("group.id","test-consumer-group");
    SingleOutputStreamOperator<String> dataStreamSource = streamExecutionEnvironment
        .addSource(new FlinkKafkaConsumer<>(
            "my-topic",new CustomKafkaDeserializationSchema(),properties).setStartFromEarliest()
        ).map((MapFunction<String,String>) s -> {
          Thread.sleep(ThreadLocalRandom.current().nextInt(0,500));
          return s;
        });
    SinkFunction<String> sinkInstance = new FlinkToCK("xxx",8123 + "","default","");
    dataStreamSource.addSink(sinkInstance);
    streamExecutionEnvironment.execute("flink consume kafka topic");
  }
}

class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<String> {

  @Override
  public boolean isEndOfStream(String s) {
    return false;
  }

  @Override
  public String deserialize(ConsumerRecord<byte[],byte[]> consumerRecord) throws Exception {
    return consumerRecord.toString();
  }

  @Override
  public Typeinformation<String> getProducedType() {
    return BasicTypeInfo.STRING_TYPE_INFO;
  }
}
class FlinkToCK extends RichSinkFunction<String> {

  FlinkToCK(String host,String port,String user,String pwd) throws sqlException {
    super();
    this.connection_ = DriverManager.getConnection("jdbc:clickhouse://" + host + ":" + port,user,pwd);
    this.statement_ = connection_.createStatement();
    statement_.execute("CREATE DATABASE IF NOT EXISTS test");
  }

  @Override
  public void invoke(String value,SinkFunction.Context context) throws sqlException {
    statement_.execute("INSERT INTO test VALUES ('${value}')");
  }

  public void close() throws sqlException {
    connection_.close();
  }

  private final Connection connection_;
  private final Statement statement_;
}

解决方法

这是因为 Connection 是不可序列化的,因为它包含对 HttpRequestExecutor 的引用,您可以尝试做的是建立连接和语句 transient。然后,而不是在构造函数中分配它们您应该覆盖 open() 方法,然后您可以打开数据库连接并创建一个新的 Statement,但这意味着它们不能是 final在这种情况下。