Flink在运行过程中, 难免会因为一些异常导致服务终止, 因为Flink的优势在于处理实时数据, 所以重启的话, 可能会导致部分数据指标不正确, 会丢失部分数据, 比如统计最近一小时数据, 运行半小时终止, 再次重启, 也只能重新开启统计. 但Flink可以通过state来解决这个问题, 将状态保存在内存, 文件系统或者db中, 持久化后, 即可实现故障后重启继续计算.
以下示例是通过kafka作为数据源, 统计各message出现的次数, 利用keyBy, process和窗口富函数实现state初始化及更新的.
示例
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsstateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class FlinkStateTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 每5s进行一次checkpoint
env.enableCheckpointing(5000);
// 恰好只消费一次 根据要求切换模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoint时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大并发产生checkpoint个数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// job cancel之后checkpoint相关文件是否会清理
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// backend保存位置
env.setStateBackend(new FsstateBackend("file:///Users/guands/dev/checkpoints"));
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink");
FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), kafkaProps);
// kafka的offset开始位置
kafka.setStartFromGroupOffsets();
// 每次生成checkpoint时提交offset 保证数据不会丢失
kafka.setCommitOffsetsOnCheckpoints(true);
DataStreamSource<String> dataStreamByEventTime = env.addSource(kafka);
dataStreamByEventTime
.keyBy((KeySelector<String, String>) s -> s)
.process(new KeyedProcessFunction<String, String, String>() {
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx, Collector<String> out) throws Exception {
long count = 0;
if (countState.value() != null) {
count = countState.value();
}
count++;
countState.update(count);
System.out.println("count: " + count);
out.collect(value);
}
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化state 获取当前key对应的个数
countState = getRuntimeContext().getState(new ValueStateDescriptor<>("test_count", Long.class));
}
}).print();
env.execute();
}
}
运行后会在对应目录下产生一些文件
重启
flink run -s D:\Users\guands\dev\checkpoints\7be830af951177a89815c8ab450b3c41\chk-6\_Metadata -c com.....FlinkStateTest D:\\flink-1.0-SNAPSHOT.jar
再次生产kafka消息, 可以看到基数在上次基础上累加. 重启成功!