在查询结果 sql 示例代码输出要复制的数据准备

问题描述

给定一个包含不规则时间戳条目的表,必须以规则的 5 分钟间隔插入“中断”(相关数据可以/将为 NULL)。

我正在考虑获取开始时间,创建一个具有窗口函数并在开始时间上增加 5 分钟间隔的子查询 - 但我只能想到使用 row_number 来增加值。

WITH data as(
select id,data,cast(date_and_time as double) * 1000 as time_milliseconds
from t1),-- original data

start_times as(
select id,MIN(CAST(date_and_time as double) * 1000) as start_time
from t1
GROUP BY id
),-- first timestamp for each id

boundries as (
SELECT T1.id,(row_number() OVER (PARTITION BY T1.id ORDER BY T1.date_and_time)-1) *300000 + start_times.start_time
as boundry
from T1
INNER JOIN start_times ON start_times.id= T1.id
) -- increment the number of 5 min added on each row and later full join boundries table with original data

然而,这限制了我在原始数据表中为一个 id 存在的行数,如果时间戳被分散,行数不能覆盖需要添加的 5 分钟间隔的数量

样本数据:

initial data:

 |-----------|------------------|------------------|
 |   id      |     value        |    timestamp     |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01.011  | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30.041  |
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20.231  |
 |-----------|------------------|------------------|
 |     1     |    3             |    15:00:00.312  |

data after my query:

 |-----------|------------------|------------------|
 |   id      |     value        | timestamp (UNIX) |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01      | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:05:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:10:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:15:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:20:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|  <-- Jumping directly to 15:00:00 (WRONG! :( need to insert more 5 min breaks here )
 |     1     |    3             |    15:00:00      |  



我想在 HIVE 中创建一个临时表,并用 x 行填充它,代表从数据表的开始时间到结束时间的 5 分钟间隔,但我找不到任何方法来实现。

任何使用“for 循环”的方法?任何建议将不胜感激。

谢谢

解决方法

您可以尝试计算当前时间戳和下一个时间戳之间的差异,除以 300 得到范围数,生成长度为 num_ranges 的空格字符串,爆炸生成行。

演示:

with your_table as (--initial data example
select stack (3,1,3,'2020-01-01 12:00:01.011',4,'2020-01-01 12:03:30.041',5,'2020-01-01 12:20:20.231' 
) as (id,value,ts )
)


select id,ts,next_ts,diff_sec,num_intervals,from_unixtime(unix_timestamp(ts)+h.i*300) new_ts,coalesce(from_unixtime(unix_timestamp(ts)+h.i*300),ts) as calculated_timestamp
from
(
 select id,(unix_timestamp(next_ts)-unix_timestamp(ts))  diff_sec,floor((unix_timestamp(next_ts)-unix_timestamp(ts))/300 --diff in seconds/5 min
                                         ) num_intervals
from
(  
select id,lead(ts) over(order by ts) next_ts
  from your_table
) s
)s
  lateral view outer posexplode(split(space(cast(s.num_intervals as int)),' ')) h as i,x --this will generate rows

结果:

id  value   ts                      next_ts                 diff_sec    num_intervals   new_ts              calculated_timestamp
1   3       2020-01-01 12:00:01.011 2020-01-01 12:03:30.041 209          0              2020-01-01 12:00:01 2020-01-01 12:00:01
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:03:30 2020-01-01 12:03:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:08:30 2020-01-01 12:08:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:13:30 2020-01-01 12:13:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:18:30 2020-01-01 12:18:30
1   5       2020-01-01 12:20:20.231 \N                      \N           \N             \N                  2020-01-01 12:20:20.231

添加了其他行。我保留了所有中间列用于调试目的。

,

递归查询在这里可能会有所帮助,但 hive 不支持这些 more info

您可以考虑在 hive 之外创建表或编写 UDF。

无论哪种方式,此查询都可能代价高昂,建议根据您的频率使用物化视图/表。

该示例显示了使用 inbetween 创建的 UDF pyspark 以运行查询。它

  1. 从数据集中生成最小和最大时间戳之间的值
  2. 使用 CTEsUDF 创建临时表 intervals
  3. possible_records 中使用昂贵的交叉连接生成所有可能的间隔
  4. 使用左连接检索具有实际值的记录(出于演示目的,我将时间戳值仅表示为时间字符串)

下面的代码展示了如何使用 hive 进行评估

示例代码


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,ArrayType

inbetween = lambda min_value,max_value :  [*range(min_value,max_value,5*60)]

udf_inbetween = udf(inbetween,ArrayType(IntegerType()))
sqlContext.udf.register("inbetween",udf_inbetween)

sqlContext.sql("""
WITH max_timestamp(t) as (
  select max(timestamp) as t from initial_data2
),min_timestamp(t) as (
  select min(timestamp) as t from initial_data2
),intervals as (
   select explode(inbetween(unix_timestamp(mint.t),unix_timestamp(maxt.t))) as interval_time FROM
   min_timestamp mint,max_timestamp maxt
),unique_ids as (
  select distinct id from initial_data2
),interval_times as (
   select interval_time from (
   select 
       cast(from_unixtime(interval_time) as timestamp) as interval_time 
   from 
       intervals
   UNION
   select distinct d.timestamp as interval_time from initial_data2 d
   )
   order by interval_time asc
),possible_records as (
   select
      distinct 
      d.id,i.interval_time
   FROM
      interval_times i,unique_ids d
   
)
select 
    p.id,d.value,split(cast(p.interval_time as string)," ")[1] as timestamp
FROM
  possible_records p
LEFT JOIN
   initial_data2 d ON d.id = p.id and d.timestamp = p.interval_time

ORDER BY p.id,p.interval_time
""").show(20)


输出

+---+-----+---------+
| id|value|timestamp|
+---+-----+---------+
|  1|    3| 12:00:01|
|  1|    4| 12:03:30|
|  1| null| 12:05:01|
|  1| null| 12:10:01|
|  1|    5| 12:12:20|
|  1| null| 12:15:01|
|  1| null| 12:20:01|
|  1| null| 12:25:01|
|  1| null| 12:30:01|
|  1| null| 12:35:01|
|  1| null| 12:40:01|
|  1| null| 12:45:01|
|  1| null| 12:50:01|
|  1| null| 12:55:01|
|  1| null| 13:00:01|
|  1| null| 13:05:01|
|  1| null| 13:10:01|
|  1| null| 13:15:01|
|  1| null| 13:20:01|
|  1| null| 13:25:01|
+---+-----+---------+

只显示前 20 行

要复制的数据准备


raw_data1 = [
    {"id":1,"value":3,"timestam":"12:00:01"},{"id":1,"value":4,"timestam":"12:03:30"},"value":5,"timestam":"12:12:20"},"timestam":"15:00:00"},]
raw_data = [*map(lambda entry : Row(**entry),raw_data1)]

initial_data = sqlContext.createDataFrame(raw_data,schema="id int,value int,timestam string ")
initial_data.createOrReplaceTempView('initial_data')

sqlContext.sql("create or replace temp view initial_data2 as select id,cast(timestam as timestamp) as timestamp from initial_data")