AppendStreamTableSink 不支持消耗由节点 Join(joinType=[InnerJoin]

问题描述

当我使用Flink SQL执行如下语句时,报错如下:

请求

根据user_id字段对user_behavior_kafka_table中的数据进行分组,然后取出每组中ts字段值最大的那条数据

执行sql

SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a 
WHERE ts = (select max(b.ts) 
FROM user_behavior_kafka_table AS b 
WHERE a.user_id = b.user_id );

Flink 版本

1.11.2

错误信息

AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin],where=[((user_id = user_id0) AND (ts = EXPR$0))],select=[user_id,ts,user_id0,EXPR$0],leftInputSpec=[NoUniqueKey],rightInputSpec=[JoinKeyContainsUniqueKey])

作业部署

纱线上

表格消息

  • user_behavior_kafka_table 来自消费者 kafka 主题的数据

{"user_id":"aaa","item_id":"11-222-333","comment":"aaa access item at","ts":100}

{"user_id":"ccc","item_id":"11-222-334","comment":"ccc access item at","ts":200}

{"user_id":"ccc","ts":300}

{"user_id":"bbb","comment":"bbb access item at","ts":200}

{"user_id":"aaa","ts":400}

{"user_id":"ccc","ts":400}

{"user_id":"vvv","comment":"vvv access item at","ts":200}

{"user_id":"bbb","ts":300}

{"user_id":"aaa","ts":300}

{"user_id":"ccc","ts":100}

{"user_id":"bbb","ts":100}

  • user_behavior_hive_table 预期结果

{"user_id":"aaa","ts":400}

{"user_id":"bbb","ts":200}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)