问题描述
首要:
- 我是Flink的新手(了解原理并能够创建我需要的任何基本流工作)
- 我使用Kinesis Analytics运行Flink作业,默认情况下,它使用间隔为1分钟的增量检查点。
- Flink作业正在使用FlinkKinesisConsumer和自定义反序列化器(将字节反序列化为一个简单的Java对象,在整个作业中使用)从Kinesis流中读取事件
我想存档的只是简单地计算过去24小时内有多少个ENTITY_ID / FOO和ENTITY_ID / BAR事件。重要的是,此计数应尽可能准确,这就是为什么我使用此Flink功能而不是自己在5分钟的翻滚窗口上进行累加总和的原因。 我还希望能够从一开始就具有“总计”事件的计数(而不仅仅是过去24小时),因此我也要在结果中输出过去5分钟的事件计数,以便后期处理应用可以只需花费这5分钟的数据并计算总和即可。 (此计数不一定是准确的,如果发生中断并且我丢失了一些计数也可以)
现在,这项工作一直很好,直到上周我们的流量激增了10倍以上。从那以后,Flink变成了香蕉。 检查点大小开始逐渐从约500MB增长到20GB,检查点时间大约需要1分钟,并且随着时间的推移逐渐增加。 该应用程序开始出现故障,并且永远无法完全恢复,并且事件迭代器的寿命没有回升,因此没有新的事件被消耗。
由于我是Flink的新手,所以我不确定我进行滑动计数的方式是否完全未优化或完全错误。
这是代码关键部分的一小段:
源(MyJsonDeserializationSchema扩展了AbstractDeserializationSchema并仅读取字节并创建Event对象):
SourceFunction<Event> source =
new FlinkKinesisConsumer<>("input-kinesis-stream",new MyJsonDeserializationSchema(),kinesisConsumerConfig);
输入事件,简单的java pojo,将在Flink运算符中使用:
public class Event implements Serializable {
public String entityId;
public String entityType;
public String entityName;
public long eventTimestamp = System.currentTimeMillis();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> eventsstream = kinesis
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
@Override
public long extractTimestamp(Event event) {
return event.eventTimestamp;
}
})
DataStream<Event> fooStream = eventsstream
.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return "foo".equalsIgnoreCase(event.entityType);
}
})
DataStream<Event> barStream = eventsstream
.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return "bar".equalsIgnoreCase(event.entityType);
}
})
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table fooTable = tEnv.fromDataStream("fooStream,entityId,entityName,entityType,eventTimestame.rowtime");
tEnv.registerTable("Foo",fooTable);
Table barTable = tEnv.fromDataStream("barStream,eventTimestame.rowtime");
tEnv.registerTable("Bar",barTable);
Table slidingFooCountTable = fooTable
.window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
.groupBy("entityId,minuteWindow")
.select("concat(concat(entityId,'_'),entityName) as slidingFooId,entityid as slidingFooEntityid,entityName as slidingFooEntityName,entityType.count as slidingFooCount,minuteWindow.rowtime as slidingFooMinute");
Table slidingBarCountTable = barTable
.window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
.groupBy("entityId,entityName) as slidingBarId,entityid as slidingBarEntityid,entityName as slidingBarEntityName,entityType.count as slidingBarCount,minuteWindow.rowtime as slidingBarMinute");
Table tumblingFooCountTable = fooTable
.window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
.groupBy("entityid,minuteWindow")
.select("concat(concat(entityName,entityName) as tumblingFooId,entityId as tumblingFooEntityId,entityNamae as tumblingFooEntityName,entityType.count as tumblingFooCount,minuteWindow.rowtime as tumblingFooMinute");
Table tumblingBarCountTable = barTable
.window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
.groupBy("entityid,entityName) as tumblingBarId,entityId as tumblingBarEntityId,entityNamae as tumblingBarEntityName,entityType.count as tumblingBarCount,minuteWindow.rowtime as tumblingBarMinute");
Table aggregatedTable = slidingFooCountTable
.leftOuterJoin(slidingBarCountTable,"slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
.leftOuterJoin(tumblingFooCountTable,"slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
.leftOuterJoin(tumblingFooCountTable,"slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
.select("slidingFooMinute as timestamp,slidingFooCreativeId as entityId,slidingFooEntityName as entityName,slidingFooCount,slidingBarCount,tumblingFooCount,tumblingBarCount");
DataStream<Result> result = tEnv.toAppendStream(aggregatedTable,Result.class);
result.addSink(sink); // write to an output stream to be picked up by a lambda function
如果有更多使用Flink的经验的人可以对我的计算方式发表评论,我将不胜感激?我的代码是否设计过度?是否有更好,更有效的方法来计数24小时内的事件?
我在Stackoverflow @DavidAnderson的某个地方读过,建议使用地图状态创建我们自己的滑动窗口,并按时间戳将事件切片。 但是我不确定这意味着什么,也没有找到任何代码示例来显示它。
解决方法
您正在其中创建许多窗口。如果您要创建一个大小为24h且滑动时间为5分钟的滑动窗口,这意味着其中将有很多打开的窗口,因此您可能希望您在给定日期收到的所有数据都将在考虑一下,至少要有一个窗口。因此,可以确定的是,检查点的大小和时间会随着数据本身的增长而增长。
要想得到答案,可以重写代码。您需要在此处提供更多详细信息,以了解您到底想达到什么目的。