FlinkKinesis Data Analytics事件会话窗口

问题描述

我是 Flink(Kinesis Data Analytics AWS 服务)的新手。我在实现 EventTimeSessionWindows 时遇到了困难。下面是我的代码

SingleOutputStreamOperator<Deserializedobj> deserializedobjSingleOutputStreamOperator = kinesisstream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Deserializedobj>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event,timestamp) -> Long.parseLong(event.timestamp)));

        SingleOutputStreamOperator<String> sessionStream = deserializedobjSingleOutputStreamOperator
                .keyBy("anonymousId")
                .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
                .process(new ProcessWindowFunction<Deserializedobj,String,Tuple,TimeWindow>() {
                    @Override
                    public void process(Tuple tuple,Context context,Iterable<Deserializedobj> iterable,Collector<String> out) throws Exception {
                        long count = 0;
                        for (Deserializedobj obj : iterable) {
                            count++;
                        }
                        out.collect("Window: " + tuple.getField(0) + " " + context.window() + "count: " + count);
                    }
                });

基本上,我想针对在 Kinesis 流上流动的用户活动创建一个基于会话(30 分钟间隔)的窗口。我的事件不能保证以正确的顺序流动,因为它们在源端触发。我想在我的 Deserializedobj 中使用“时间戳”列来确保我可以将事件放入正确的框架(会话)中。在处理该窗口之前,我想等待几秒钟 (4) 以确保我没有丢失任何事件(因为它们是延迟事件)。

目前,我遇到了以下错误,它没有给出任何内部发生的线索。我从 Apache Flink 仪表板下的异常选项卡中捕获了此错误

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d7aff90[Not completed,task = java.util.concurrent.Executors$RunnableAdapter@3aa696a6[Wrapped task = org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer@fc8fb39]] rejected from java.util.concurrent.ThreadPoolExecutor@520831ff[Shutting down,pool size = 8,active threads = 8,queued tasks = 0,completed tasks = 14]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:473)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:345)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.sourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)```

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)