Flink中的检查点随时间增加

问题描述

关于this question的汇总,我仍然不清楚为什么Flink作业的检查点会随着时间的增长而增长,目前,在连续运行约7天的时间里,这些检查点从未达到稳定状态。 目前,我正在使用Flink 1.10版本,FS State Backend,因为我的工作负担不起使用RocksDB的延迟成本。

查看检查点在7天内的演变情况:

enter image description here

假设我对所有有状态运算符的状态TTL进行了这样的配置,一个小时甚至可能超过一个小时,有时甚至一天:

public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot().build();

我担心,所有进入状态的对象都将在到期时间后清理,因此应减小检查点的大小,并且正如我们期望每天或多或少的数据量一样。

另一方面,我们有一条流量曲线,该流量曲线在一天的某些小时内有更多的传入数据,但深夜流量下降了,应清理掉所有过期状态的对象,从而导致检查点大小应该减少流量,直到流量再次上升。

让我们看一下一个用例的代码示例:

DataStream<Event> stream = addSource(source);
KeyedStream<Event,String> keyedStream = stream.filter((FilterFunction<Event>) event ->
                    apply filters here;))
                    .name("Events filtered")
                    .keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())


public class MyFlatMapFunction extends RichFlatMapFunction<Event,Event>{
private final MapStateDescriptor<String,Event> descriptor = new MapStateDescriptor<>("prev_state",String.class,Event.class);
private MapState<String,Event> prevIoUsstate;

@Override
    public void open(Configuration parameters) {
        /*ttlConfig described above*/
        descriptor.enableTimetoLive(ttlConfig);
        prevIoUsstate = getRuntimeContext().getMapState(descriptor);
    }

@Override
    public void flatMap(Event event,Collector<Event> collector) throws Exception {
      final String key = event.rType.equals("something") ? event.id1 : event.id2;
      Event prevIoUs = prevIoUsstate.get(key);
      if(prevIoUs != null){
        /*something done here*/
      }else /*something done here*/
        prevIoUsstate.put(key,prevIoUs);
        collector.collect(prevIoUs);
 }
}

这些或多或少是用例的结构,另一些使用Windows(时间窗口或会话窗口)

问题:

  • 在这里做什么错了?
  • 状态到期时是否进行清理,并且这种情况与其余用例相同?
  • 如果检查点工作不正常,什么可以帮助我修复检查点大小?
  • 这种行为正常吗?

亲切的问候!

解决方法

在这段代码中,您似乎只是在写回已经存在的状态,这仅用于重置TTL计时器。这也许可以解释为什么状态没有过期。

Event previous = previousState.get(key);
if (previous != null) {
  /*something done here*/
} else
  previousState.put(key,previous);

似乎您应该使用ValueState而不是MapState。 ValueState有效地提供了分片键/值存储,其中键是用于在keyBy中对流进行分区的键。 MapState为您提供每个键的嵌套映射,而不是单个值。但是,由于您在flatMap中使用的是与最初用于密钥流相同的密钥,因此,密钥分区的ValueState似乎就是您所需要的。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...