在Flink SQL中连接连续查询

问题描述

我正在尝试连接两个连续的查询,但始终遇到以下错误

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.\nPlease check the documentation for the set of currently supported sql features.

这是表的定义:

CREATE TABLE `Combined` (
    `machineID` STRING,`cycleID` BIGINT,`start` TIMESTAMP(3),`end` TIMESTAMP(3),WATERMARK FOR `end` AS `end` - INTERVAL '5' SECOND,`sensor1` FLOAT,`sensor2` FLOAT
)

和插入查询

INSERT INTO `Combined` 
SELECT
    a.`MachineID`,a.`cycleID`,MAX(a.`start`) `start`,MAX(a.`end`) `end`,MAX(a.`sensor1`) `sensor1`,MAX(m.`sensor2`) `sensor2`
FROM `Aggregated` a,`Machinestatus` m
WHERE 
    a.`MachineID` = m.`MachineID` AND 
    a.`cycleID` = m.`cycleID` AND 
    a.`start` = m.`timestamp`
GROUP BY a.`MachineID`,SESSION(a.`start`,INTERVAL '1' SECOND)

在源表AggregatedMachinestatus中,starttimestamp列是带有水印的时间属性

我尝试将联接的输入行强制转换为时间戳,但这并不能解决问题,这意味着我不能使用SESSION,这应该确保仅记录一个数据点每个周期。

非常感谢您的帮助!

解决方法

我对此进行了进一步调查,发现GROUP BY语句在这种情况下没有意义。

此外,SESSION可以用时间窗口代替,这是比较惯用的方法。

INSERT INTO `Combined` 
SELECT
    a.`MachineID`,a.`cycleID`,a.`start`,a.`end`,a.`sensor1`,m.`sensor2`
FROM `Aggregated` a,`MachineStatus` m
WHERE 
    a.`MachineID` = m.`MachineID` AND 
    a.`cycleID` = m.`cycleID` AND 
    m.`timestamp` BETWEEN a.`start` AND a.`start` + INTERVAL '0' SECOND

要了解联接动态表的不同方法,我发现Ververica SQL training非常有用。