SQL查找具有下一个最佳时间戳匹配的行对

问题描述

我的挑战是找到时间戳相邻的行对,并仅保留值字段距离最小(正值之差)的那对行

measurement从带有时间戳和值的不同传感器收集数据。

id | sensor_id | timestamp | value
---+-----------+-----------+------
 1 |         1 | 12:00:00  |     5
 2 |         2 | 12:01:00  |     6
 3 |         1 | 12:02:00  |     4
 4 |         2 | 12:02:00  |     7
 5 |         2 | 12:03:00  |     3
 6 |         1 | 12:05:00  |     3
 7 |         2 | 12:06:00  |     4
 8 |         2 | 12:07:00  |     5
 9 |         1 | 12:08:00  |     6

传感器的值从其时间戳记到下一个记录的时间戳记(相同的sensor_id)都是有效的。

图形表示

enter image description here

下方的绿线表示传感器1(蓝线)和传感器2(红线)的值随时间的距离。

我的目标是

  1. 仅合并两个与时间戳逻辑匹配的传感器的记录(以获取绿线)
  2. 在以下位置查找实例的局部最小值
    • 12:01:00(在12:00:00时,传感器2没有记录)
    • 12:05:00
    • 12:08:00

真实表位于Postgresql数据库中,包含约500万条记录的15个传感器。

测试数据

create table measurement (
    id serial,sensor_id integer,timestamp timestamp,value integer)
;

insert into measurement (sensor_id,timestamp,value)
values
(1,'2020-08-16 12:00:00',5),(2,'2020-08-16 12:01:00',6),(1,'2020-08-16 12:02:00',4),7),'2020-08-16 12:03:00',3),'2020-08-16 12:05:00','2020-08-16 12:06:00','2020-08-16 12:07:00','2020-08-16 12:08:00',6)
;

我的方法

选择2个任意传感器(通过特定的sensor_id),进行自我连接,并仅保留具有先前时间戳的传感器2的记录(对于具有传感器1的时间戳的传感器2的最大时间戳,

select
*
from (
    select
    *,row_number() over (partition by m1.timestamp order by m2.timestamp desc) rownum
    from measurement m1
    join measurement m2
        on m1.sensor_id <> m2.sensor_id
        and m1.timestamp >= m2.timestamp
    --arbitrarily sensor_ids 1 and 2
    where m1.sensor_id = 1
    and m2.sensor_id = 2
) foo
where rownum = 1

union --vice versa

select
*
from (
    select
    *,row_number() over (partition by m2.timestamp order by m1.timestamp desc) rownum
    from measurement m1
    join measurement m2
        on m1.sensor_id <> m2.sensor_id
        and m1.timestamp <= m2.timestamp
    --arbitrarily sensor_ids 1 and 2
    where m1.sensor_id = 1
    and m2.sensor_id = 2
) foo
where rownum = 1
;

但这会返回一对12:00:00,其中传感器2没有数据(不是大问题)
在实际表上,语句执行不会在几个小时后结束(大问题)。

我发现了某些类似的问题,但与我的问题不符

谢谢!

解决方法

您可以使用几个横向连接。例如:

with
t as (select distinct timestamp as ts from measurement)
select
  t.ts,s1.value as v1,s2.value as v2,abs(s1.value - s2.value) as distance
from t,lateral (
  select value
  from measurement m 
  where m.sensor_id = 1 and m.timestamp <= t.ts
  order by timestamp desc
  limit 1
) s1,lateral (
  select value
  from measurement m 
  where m.sensor_id = 2 and m.timestamp <= t.ts
  order by timestamp desc
  limit 1
) s2
order by t.ts

结果:

ts                     v1  v2  distance
---------------------  --  --  --------
2020-08-16 12:01:00.0   5   6         1
2020-08-16 12:02:00.0   4   7         3
2020-08-16 12:03:00.0   4   3         1
2020-08-16 12:05:00.0   3   3         0
2020-08-16 12:06:00.0   3   4         1
2020-08-16 12:07:00.0   3   5         2
2020-08-16 12:08:00.0   6   5         1

请参见DB Fiddle上的运行示例。

此外,如果您要所有时间戳,甚至是12:00:00之类的不匹配时间戳,也可以执行以下操作:

with
t as (select distinct timestamp as ts from measurement)
select
  t.ts,abs(s1.value - s2.value) as distance
from t
left join lateral (
  select value
  from measurement m 
  where m.sensor_id = 1 and m.timestamp <= t.ts
  order by timestamp desc
  limit 1
) s1 on true
left join lateral (
  select value
  from measurement m 
  where m.sensor_id = 2 and m.timestamp <= t.ts
  order by timestamp desc
  limit 1
) s2 on true
order by t.ts

在这种情况下,无法计算距离。

结果:

ts                     v1      v2  distance
---------------------  --  ------  --------
2020-08-16 12:00:00.0   5  <null>    <null>
2020-08-16 12:01:00.0   5       6         1
2020-08-16 12:02:00.0   4       7         3
2020-08-16 12:03:00.0   4       3         1
2020-08-16 12:05:00.0   3       3         0
2020-08-16 12:06:00.0   3       4         1
2020-08-16 12:07:00.0   3       5         2
2020-08-16 12:08:00.0   6       5         1
,

第一步是计算每个时间戳记之间的差异。一种方法是使用横向联接和条件聚合:

select t.timestamp,max(m.value) filter (where s.sensor_id = 1) as value_1,max(m.value) filter (where s.sensor_id = 2) as value_2,abs(max(m.value) filter (where s.sensor_id = 2) -
           max(m.value) filter (where s.sensor_id = 1)
          ) as diff
from (values (1),(2)) s(sensor_id) cross join
     (select distinct timestamp
      from measurement
      where sensor_id in (1,2)
     ) t left join lateral
     (select m.value
      from measurement m 
      where m.sensor_id = s.sensor_id and
            m.timestamp <= t.timestamp
      order by m.timestamp desc
      limit 1 
     ) m
     on 1=1
group by timestamp;

现在的问题是,差异何时输入局部最小值。对于您的样本数据,本地最小值全部为一个时间单位。这意味着您可以使用lag()lead()来找到它们:

with t as (
      select  t.timestamp,abs(max(m.value) filter (where s.sensor_id = 2) -
                  max(m.value) filter (where s.sensor_id = 1)
                 ) as diff
      from (values (1),(2)) s(sensor_id) cross join
           (select distinct timestamp
            from measurement
            where sensor_id in (1,2)
           ) t left join lateral
           (select m.value
            from measurement m 
            where m.sensor_id = s.sensor_id and
                  m.timestamp <= t.timestamp
            order by m.timestamp desc
            limit 1 
           ) m
           on 1=1
      group by timestamp
     )
select *
from (select t.*,lag(diff) over (order by timestamp) as prev_diff,lead(diff) over (order by timestamp) as next_diff
      from t
     ) t
where (diff < prev_diff or prev_diff is null) and
      (diff < next_diff or next_diff is null);

这可能不是一个合理的假设。因此,在应用此逻辑之前,请滤除相邻的重复值:

select *
from (select t.*,lead(diff) over (order by timestamp) as next_diff
      from (select t.*,lag(diff) over (order by timestamp) as test_for_dup
            from t
           ) t
      where test_for_dup is distinct from diff
     ) t
where (diff < prev_diff or prev_diff is null) and
      (diff < next_diff or next_diff is null)

Here是db 小提琴。

,

填充缺失值需要窗口函数和与两个传感器交叉的每分钟的笛卡尔积。

invars cte接受参数。

with invars as (
  select '2020-08-16 12:00:00'::timestamp as start_ts,'2020-08-16 12:08:00'::timestamp as end_ts,array[1,2] as sensor_ids
),

创建minute x sensor_id的矩阵

calendar as (
  select g.minute,s.sensor_id,sensor_ids[1] as sid1,sensor_ids[2] as sid2
    from invars i
   cross join generate_series(
           i.start_ts,i.end_ts,interval '1 minute'
         ) as g(minute)
   cross join unnest(i.sensor_ids) as s(sensor_id)
),

每次从mgrp获得新值时都查找sensor_id

gaps as (
  select c.minute,c.sensor_id,m.value,sum(case when m.value is null then 0 else 1 end)
            over (partition by c.sensor_id 
                      order by c.minute) as mgrp,c.sid1,c.sid2
    from calendar c
         left join measurement m
                on m.timestamp = c.minute 
               and m.sensor_id = c.sensor_id
),

通过结转最新值来插补缺失值

interpolated as (
  select minute,sensor_id,coalesce(
           value,first_value(value) over
                    (partition by sensor_id,mgrp
                         order by minute)
         ) as value,sid1,sid2
    from gaps
)

执行distance计算({{1}可能是sum()max(),这没什么区别。

min()

结果:

select minute,sum(value) filter (where sensor_id = sid1) as value1,sum(value) filter (where sensor_id = sid2) as value2,abs(
         sum(value) filter (where sensor_id = sid1) 
         - sum(value) filter (where sensor_id = sid2)
       ) as distance
  from interpolated
 group by minute
 order by minute;

请参阅this working fiddle

,

窗口功能和检查相邻区域。 (您将需要一个额外的反自加入来删除重复项,并为稳定婚姻问题发明决胜局)


SELECT id,ztimestamp,value
        --,prev_ts,next_ts,(ztimestamp - prev_ts) AS prev_span,(next_ts - ztimestamp) AS next_span,(sensor_id <> prev_sensor) AS prev_valid,(sensor_id <> next_sensor) AS next_valid,CASE WHEN (sensor_id <> prev_sensor AND sensor_id <> next_sensor) THEN
                CASE WHEN (ztimestamp - prev_ts) < (next_ts - ztimestamp) THEN prev_id ELSE next_id END
        WHEN (sensor_id <> prev_sensor) THEN prev_id
        WHEN (sensor_id <> next_sensor) THEN next_id
        ELSE NULL END AS best_neigbor
 FROM (
        SELECT id,value,lag(id) OVER www AS prev_id,lead(id) OVER www AS next_id,lag(sensor_id) OVER www AS prev_sensor,lead(sensor_id) OVER www AS next_sensor,lag(ztimestamp) OVER www AS prev_ts,lead(ztimestamp) OVER www AS next_ts
        FROM measurement
        WINDOW www AS (order by ztimestamp)
        ) q
ORDER BY ztimestamp,sensor_id
        ;

结果:


DROP SCHEMA
CREATE SCHEMA
SET
CREATE TABLE
INSERT 0 9
 id | sensor_id |     ztimestamp      | value | prev_span | next_span | prev_valid | next_valid | best_neigbor 
----+-----------+---------------------+-------+-----------+-----------+------------+------------+--------------
  1 |         1 | 2020-08-16 12:00:00 |     5 |           | 00:01:00  |            | t          |            2
  2 |         2 | 2020-08-16 12:01:00 |     6 | 00:01:00  | 00:01:00  | t          | t          |            3
  3 |         1 | 2020-08-16 12:02:00 |     4 | 00:01:00  | 00:00:00  | t          | t          |            4
  4 |         2 | 2020-08-16 12:02:00 |     7 | 00:00:00  | 00:01:00  | t          | f          |            3
  5 |         2 | 2020-08-16 12:03:00 |     3 | 00:01:00  | 00:02:00  | f          | t          |            6
  6 |         1 | 2020-08-16 12:05:00 |     3 | 00:02:00  | 00:01:00  | t          | t          |            7
  7 |         2 | 2020-08-16 12:06:00 |     4 | 00:01:00  | 00:01:00  | t          | f          |            6
  8 |         2 | 2020-08-16 12:07:00 |     5 | 00:01:00  | 00:01:00  | f          | t          |            9
  9 |         1 | 2020-08-16 12:08:00 |     6 | 00:01:00  |           | t          |            |            8
(9 rows)