Flink作业中每1:45H之后CPU负载上升

问题描述

在上一个问题(here)中,我问为什么我的Flink 1.10.1作业会导致运行一小时后cpu负载开始增加,却从未下降,因此,我开始使用VisualVM监视作业,jprofiler和MemoryAnalyzer,我发现了一些错误,这些错误与不必要的对象创建,HashMaps和其他一些固定的问题有关。然后,内存消耗和cpu负载都不错,但是,cpu负载几乎每隔1:​​45H就开始表现出类似“ hills”的状态。

我具有与以前相同的执行图,但我也将其放在这里

DataStream<Event> from_source = rabbitConsumer
                .flatMap(new RabbitMQConsumer())
                .assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source 
                    .filter(new NullidsFilterFunction())
KeyedStream<String,Event> keyed_stream = data_stream.keyby(k->k.id);

/*one-> stateful operator: ProcessWindowFunction*/
data_stream.map(new EventCount(x))
            .keyBy(k -> new Date(k.timestamp.getTime()).toString())
            .window(TumblingEventTimeWindows.of(Time.ninutes(30)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*two-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*three*/
keyed_stream.filter(new FilterFunction())
            .map(new MapClass())
            .addSink(new SinkFuncion());

/*four*/
pw_keyed_stream = data_stream
            .filter(new FilterFunction())
            .map(new MapClass())
            .keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
  pw_keyed_stream.addSink(new SinkFuncion());

/*five-> stateful operator: ProcessWindowFunction*/
pw_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
          .process(new MyProcessWindowFunction())
          .addSink(new SinkFuncion());

/*Six-> stateful operator with 4 ConcurrentHashMap into the state: RichFlatMapFunction*/
keyed_stream.flatmap(new FlatMapFunction())
            .addSink(new SinkFuncion());

/*seven-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*eight-> stateful operator: RichFlatMapFunction*/
data_stream.filter(new FilterFunction())
           .keyBy(k -> k.type.equals("something") ? k.one : k.two)
           .flatmap(new FlatMapFunction())
           .addSink(new SinkFuncion());

更改:

  • 从JDK8升级到JDK9(两个JDK的行为相同)
  • 从CMS更改为G1收集器(两个GC的山丘行为相同)

请参见下图:

enter image description here

我希望当流量下降时,负载也会下降,但是即使负载很低,这些山丘也会停留在那儿。

再次感谢。 亲切的问候!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...