问题描述
我是 Flink(Kinesis Data Analytics AWS 服务)的新手。我在实现 EventTimeSessionWindows 时遇到了困难。下面是我的代码:
SingleOutputStreamOperator<Deserializedobj> deserializedobjSingleOutputStreamOperator = kinesisstream
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Deserializedobj>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event,timestamp) -> Long.parseLong(event.timestamp)));
SingleOutputStreamOperator<String> sessionStream = deserializedobjSingleOutputStreamOperator
.keyBy("anonymousId")
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new ProcessWindowFunction<Deserializedobj,String,Tuple,TimeWindow>() {
@Override
public void process(Tuple tuple,Context context,Iterable<Deserializedobj> iterable,Collector<String> out) throws Exception {
long count = 0;
for (Deserializedobj obj : iterable) {
count++;
}
out.collect("Window: " + tuple.getField(0) + " " + context.window() + "count: " + count);
}
});
基本上,我想针对在 Kinesis 流上流动的用户活动创建一个基于会话(30 分钟间隔)的窗口。我的事件不能保证以正确的顺序流动,因为它们在源端触发。我想在我的 Deserializedobj 中使用“时间戳”列来确保我可以将事件放入正确的框架(会话)中。在处理该窗口之前,我想等待几秒钟 (4) 以确保我没有丢失任何事件(因为它们是延迟事件)。
目前,我遇到了以下错误,它没有给出任何内部发生的线索。我从 Apache Flink 仪表板下的异常选项卡中捕获了此错误。
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d7aff90[Not completed,task = java.util.concurrent.Executors$RunnableAdapter@3aa696a6[Wrapped task = org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer@fc8fb39]] rejected from java.util.concurrent.ThreadPoolExecutor@520831ff[Shutting down,pool size = 8,active threads = 8,queued tasks = 0,completed tasks = 14]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:473)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:345)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.sourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)```
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)