问题描述
给定一个包含不规则时间戳条目的表,必须以规则的 5 分钟间隔插入“中断”(相关数据可以/将为 NULL)。
我正在考虑获取开始时间,创建一个具有窗口函数并在开始时间上增加 5 分钟间隔的子查询 - 但我只能想到使用 row_number 来增加值。
WITH data as(
select id,data,cast(date_and_time as double) * 1000 as time_milliseconds
from t1),-- original data
start_times as(
select id,MIN(CAST(date_and_time as double) * 1000) as start_time
from t1
GROUP BY id
),-- first timestamp for each id
boundries as (
SELECT T1.id,(row_number() OVER (PARTITION BY T1.id ORDER BY T1.date_and_time)-1) *300000 + start_times.start_time
as boundry
from T1
INNER JOIN start_times ON start_times.id= T1.id
) -- increment the number of 5 min added on each row and later full join boundries table with original data
然而,这限制了我在原始数据表中为一个 id 存在的行数,如果时间戳被分散,行数不能覆盖需要添加的 5 分钟间隔的数量。
样本数据:
initial data:
|-----------|------------------|------------------|
| id | value | timestamp |
|-----------|------------------|------------------|
| 1 | 3 | 12:00:01.011 |
|-----------|------------------|------------------|
| 1 | 4 | 12:03:30.041 |
|-----------|------------------|------------------|
| 1 | 5 | 12:12:20.231 |
|-----------|------------------|------------------|
| 1 | 3 | 15:00:00.312 |
data after my query:
|-----------|------------------|------------------|
| id | value | timestamp (UNIX) |
|-----------|------------------|------------------|
| 1 | 3 | 12:00:01 |
|-----------|------------------|------------------|
| 1 | 4 | 12:03:30 |
|-----------|------------------|------------------|
| 1 | NULL | 12:05:01 | <-- Data from "boundries"
|-----------|------------------|------------------|
| 1 | NULL | 12:10:01 | <-- Data from "boundries"
|-----------|------------------|------------------|
| 1 | 5 | 12:12:20 |
|-----------|------------------|------------------|
| 1 | NULL | 12:15:01 | <-- Data from "boundries"
|-----------|------------------|------------------|
| 1 | NULL | 12:20:01 | <-- Data from "boundries"
|-----------|------------------|------------------| <-- Jumping directly to 15:00:00 (WRONG! :( need to insert more 5 min breaks here )
| 1 | 3 | 15:00:00 |
我想在 HIVE 中创建一个临时表,并用 x 行填充它,代表从数据表的开始时间到结束时间的 5 分钟间隔,但我找不到任何方法来实现。
任何使用“for 循环”的方法?任何建议将不胜感激。
谢谢
解决方法
您可以尝试计算当前时间戳和下一个时间戳之间的差异,除以 300 得到范围数,生成长度为 num_ranges 的空格字符串,爆炸生成行。
演示:
with your_table as (--initial data example
select stack (3,1,3,'2020-01-01 12:00:01.011',4,'2020-01-01 12:03:30.041',5,'2020-01-01 12:20:20.231'
) as (id,value,ts )
)
select id,ts,next_ts,diff_sec,num_intervals,from_unixtime(unix_timestamp(ts)+h.i*300) new_ts,coalesce(from_unixtime(unix_timestamp(ts)+h.i*300),ts) as calculated_timestamp
from
(
select id,(unix_timestamp(next_ts)-unix_timestamp(ts)) diff_sec,floor((unix_timestamp(next_ts)-unix_timestamp(ts))/300 --diff in seconds/5 min
) num_intervals
from
(
select id,lead(ts) over(order by ts) next_ts
from your_table
) s
)s
lateral view outer posexplode(split(space(cast(s.num_intervals as int)),' ')) h as i,x --this will generate rows
结果:
id value ts next_ts diff_sec num_intervals new_ts calculated_timestamp
1 3 2020-01-01 12:00:01.011 2020-01-01 12:03:30.041 209 0 2020-01-01 12:00:01 2020-01-01 12:00:01
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:03:30 2020-01-01 12:03:30
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:08:30 2020-01-01 12:08:30
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:13:30 2020-01-01 12:13:30
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:18:30 2020-01-01 12:18:30
1 5 2020-01-01 12:20:20.231 \N \N \N \N 2020-01-01 12:20:20.231
添加了其他行。我保留了所有中间列用于调试目的。
,递归查询在这里可能会有所帮助,但 hive 不支持这些 more info。
您可以考虑在 hive 之外创建表或编写 UDF。
无论哪种方式,此查询都可能代价高昂,建议根据您的频率使用物化视图/表。
该示例显示了使用 inbetween
创建的 UDF pyspark
以运行查询。它
- 从数据集中生成最小和最大时间戳之间的值
- 使用
CTEs
和UDF
创建临时表intervals
- 在
possible_records
中使用昂贵的交叉连接生成所有可能的间隔 - 使用左连接检索具有实际值的记录(出于演示目的,我将时间戳值仅表示为时间字符串)
下面的代码展示了如何使用 hive 进行评估
示例代码
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,ArrayType
inbetween = lambda min_value,max_value : [*range(min_value,max_value,5*60)]
udf_inbetween = udf(inbetween,ArrayType(IntegerType()))
sqlContext.udf.register("inbetween",udf_inbetween)
sqlContext.sql("""
WITH max_timestamp(t) as (
select max(timestamp) as t from initial_data2
),min_timestamp(t) as (
select min(timestamp) as t from initial_data2
),intervals as (
select explode(inbetween(unix_timestamp(mint.t),unix_timestamp(maxt.t))) as interval_time FROM
min_timestamp mint,max_timestamp maxt
),unique_ids as (
select distinct id from initial_data2
),interval_times as (
select interval_time from (
select
cast(from_unixtime(interval_time) as timestamp) as interval_time
from
intervals
UNION
select distinct d.timestamp as interval_time from initial_data2 d
)
order by interval_time asc
),possible_records as (
select
distinct
d.id,i.interval_time
FROM
interval_times i,unique_ids d
)
select
p.id,d.value,split(cast(p.interval_time as string)," ")[1] as timestamp
FROM
possible_records p
LEFT JOIN
initial_data2 d ON d.id = p.id and d.timestamp = p.interval_time
ORDER BY p.id,p.interval_time
""").show(20)
输出
+---+-----+---------+
| id|value|timestamp|
+---+-----+---------+
| 1| 3| 12:00:01|
| 1| 4| 12:03:30|
| 1| null| 12:05:01|
| 1| null| 12:10:01|
| 1| 5| 12:12:20|
| 1| null| 12:15:01|
| 1| null| 12:20:01|
| 1| null| 12:25:01|
| 1| null| 12:30:01|
| 1| null| 12:35:01|
| 1| null| 12:40:01|
| 1| null| 12:45:01|
| 1| null| 12:50:01|
| 1| null| 12:55:01|
| 1| null| 13:00:01|
| 1| null| 13:05:01|
| 1| null| 13:10:01|
| 1| null| 13:15:01|
| 1| null| 13:20:01|
| 1| null| 13:25:01|
+---+-----+---------+
只显示前 20 行
要复制的数据准备
raw_data1 = [
{"id":1,"value":3,"timestam":"12:00:01"},{"id":1,"value":4,"timestam":"12:03:30"},"value":5,"timestam":"12:12:20"},"timestam":"15:00:00"},]
raw_data = [*map(lambda entry : Row(**entry),raw_data1)]
initial_data = sqlContext.createDataFrame(raw_data,schema="id int,value int,timestam string ")
initial_data.createOrReplaceTempView('initial_data')
sqlContext.sql("create or replace temp view initial_data2 as select id,cast(timestam as timestamp) as timestamp from initial_data")