问题描述
在上一个问题(here)中,我问为什么我的Flink 1.10.1作业会导致运行一小时后cpu负载开始增加,却从未下降,因此,我开始使用VisualVM监视作业,jprofiler和MemoryAnalyzer,我发现了一些错误,这些错误与不必要的对象创建,HashMaps和其他一些固定的问题有关。然后,内存消耗和cpu负载都不错,但是,cpu负载几乎每隔1:45H就开始表现出类似“ hills”的状态。
我具有与以前相同的执行图,但我也将其放在这里:
DataStream<Event> from_source = rabbitConsumer
.flatMap(new RabbitMQConsumer())
.assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source
.filter(new NullidsFilterFunction())
KeyedStream<String,Event> keyed_stream = data_stream.keyby(k->k.id);
/*one-> stateful operator: ProcessWindowFunction*/
data_stream.map(new EventCount(x))
.keyBy(k -> new Date(k.timestamp.getTime()).toString())
.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*two-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*three*/
keyed_stream.filter(new FilterFunction())
.map(new MapClass())
.addSink(new SinkFuncion());
/*four*/
pw_keyed_stream = data_stream
.filter(new FilterFunction())
.map(new MapClass())
.keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
pw_keyed_stream.addSink(new SinkFuncion());
/*five-> stateful operator: ProcessWindowFunction*/
pw_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*Six-> stateful operator with 4 ConcurrentHashMap into the state: RichFlatMapFunction*/
keyed_stream.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
/*seven-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*eight-> stateful operator: RichFlatMapFunction*/
data_stream.filter(new FilterFunction())
.keyBy(k -> k.type.equals("something") ? k.one : k.two)
.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
更改:
- 从JDK8升级到JDK9(两个JDK的行为相同)
- 从CMS更改为G1收集器(两个GC的山丘行为相同)
我希望当流量下降时,负载也会下降,但是即使负载很低,这些山丘也会停留在那儿。
再次感谢。 亲切的问候!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)