问题描述
flink消耗kafka,下沉到clickhouse
dataStreamSource.addSink(sinkInstance);
处的错误
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: org.apache.http.protocol.HttpRequestExecutor@6ea2bc93 is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
at org.data.dataflow.KafkaToFlink.main(KafkaToFlink.java:36)
Caused by: java.io.NotSerializableException: org.apache.http.protocol.HttpRequestExecutor
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
... 11 more
public class KafkaToFlink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
.getExecutionEnvironment().setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","10.227.89.202:9092");
properties.setProperty("group.id","test-consumer-group");
SingleOutputStreamOperator<String> dataStreamSource = streamExecutionEnvironment
.addSource(new FlinkKafkaConsumer<>(
"my-topic",new CustomKafkaDeserializationSchema(),properties).setStartFromEarliest()
).map((MapFunction<String,String>) s -> {
Thread.sleep(ThreadLocalRandom.current().nextInt(0,500));
return s;
});
SinkFunction<String> sinkInstance = new FlinkToCK("xxx",8123 + "","default","");
dataStreamSource.addSink(sinkInstance);
streamExecutionEnvironment.execute("flink consume kafka topic");
}
}
class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<String> {
@Override
public boolean isEndOfStream(String s) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[],byte[]> consumerRecord) throws Exception {
return consumerRecord.toString();
}
@Override
public Typeinformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
class FlinkToCK extends RichSinkFunction<String> {
FlinkToCK(String host,String port,String user,String pwd) throws sqlException {
super();
this.connection_ = DriverManager.getConnection("jdbc:clickhouse://" + host + ":" + port,user,pwd);
this.statement_ = connection_.createStatement();
statement_.execute("CREATE DATABASE IF NOT EXISTS test");
}
@Override
public void invoke(String value,SinkFunction.Context context) throws sqlException {
statement_.execute("INSERT INTO test VALUES ('${value}')");
}
public void close() throws sqlException {
connection_.close();
}
private final Connection connection_;
private final Statement statement_;
}
解决方法
这是因为 Connection
是不可序列化的,因为它包含对 HttpRequestExecutor
的引用,您可以尝试做的是建立连接和语句 transient
。然后,而不是在构造函数中分配它们您应该覆盖 open()
方法,然后您可以打开数据库连接并创建一个新的 Statement
,但这意味着它们不能是 final
在这种情况下。