在Flink中,如何在flink中将累积值转换为增量值,然后通过某些键进行汇总

问题描述

如何在flink中将累加值转换为累加值(某些键被视为用户,然后累加值成为两个相邻键的累加值),然后根据累加值(时间维度,一个键)进行汇总(总和)

例如,原始数据为:

时间A B值

0 1 1 1

0 2 2 2

0 1 1 4

0 2 2 3

1 1 1 5

1 2 2 6

转换为增量值后,我们得到了

时间A B值

0 1 1 1

0 2 2 2

0 1 1 3

0 2 2 1

1 1 1 2

1 2 2 3

然后我们通过(时间,A)进行汇总,得出的最终结果是

时间A值

0 1 4

0 2 3

1 1 2

1 2 3

是否有一个程序可以同时执行这两项操作? 一种解决方案是使用会话窗口或全局窗口将原始表转换为增量表并将其存储在另一个位置,然后启动另一个任务来汇总结果?但这会占用额外的存储空间。

对不起,我的英语不好,谢谢您的建议。

解决方法

不需要两个单独的应用程序或存储任何东西。只需让第一步的输出流入第二步即可。从概念上讲就是

results = input
  .somehowDoTheIncrementalPart()
  .thenAggregate();

或者在SQL中,您可以使用嵌套查询,例如

SELECT ts,sum(diff) FROM ( 
  SELECT ts,userId,diff 
  FROM events 
  MATCH_RECOGNIZE ( 
    PARTITION BY id 
    ORDER BY ts 
    MEASURES 
      p2.v - p1.v AS diff,p2.id AS userId,p2.ts AS ts 
    AFTER MATCH SKIP TO LAST p2 
    PATTERN (p1 p2) 
    DEFINE p1 AS TRUE,p2 AS TRUE )
) GROUP BY ts,userId