问题描述
我有一个数据集,该数据集吸收对数据行的最新编辑,但仅吸收最近编辑的版本。 (即,它在update_ts
时间戳列上是递增的)。
原始表:
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
表格更新后
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
| key_1 | 2 |
提取后,我需要为所有以前的更新计算“最新版本”,同时还要考虑任何新的编辑。
这意味着我每次都要进行增量摄取并运行SNAPSHOT输出。这对于我的构建来说非常慢,因为我注意到每次想为数据计算最新版本时,都必须查看所有输出行。
交易n = 1(快照):
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
交易n = 2(APPEND):
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 1 |
| key_2 | 1 |
如何使“最新版本”的计算速度更快?
解决方法
这是一种常见的模式,将从bucketing中受益。
其要点是:根据primary_key
列将输出SNAPSHOT写入存储桶中,其中完全忽略了将更大的输出改组的昂贵步骤。
这意味着您只需要将新数据交换到已经包含先前历史记录的存储桶中即可。
让我们从初始状态开始,在该状态下,我们运行的是先前计算的“最新”版本,该版本运行速度很慢:
- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT
如果我们使用clean_dataset
列上的存储桶将primary_key
写入到单独计算的存储桶数中,以适应我们期望的数据规模,则需要以下代码:
from transforms.api import transform,Input,Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
@transform(
my_output=Output("/datasets/clean_dataset"),my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input,my_output):
BUCKET_COUNT = 600
PRIMARY_KEY = "primary_key"
ORDER_COL = "update_ts"
updated_keys = my_input.dataframe("added")
last_written = my_output.dataframe("current")
updated_keys.repartition(BUCKET_COUNT,PRIMARY_KEY)
value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]
updated_keys = updated_keys.select(
PRIMARY_KEY,*[F.col(x).alias("updated_keys_" + x) for x in value_cols]
)
last_written = last_written.select(
PRIMARY_KEY,*[F.col(x).alias("last_written_" + x) for x in value_cols]
)
all_rows = updated_keys.join(last_written,PRIMARY_KEY,"fullouter")
latest_df = all_rows.select(
PRIMARY_KEY,*[F.coalesce(
F.col("updated_keys_" + x),F.col("last_written_" + x)
).alias(x) for x in value_cols]
)
my_output.set_mode("replace")
return my_output.write_dataframe(
latest_df,bucket_cols=PRIMARY_KEY,bucket_count=BUCKET_COUNT,sort_by=ORDER_COL
)
运行该命令时,您会在查询计划中注意到该项目移至输出上不再包含交换,这意味着它将不会对数据进行改组。现在,您将看到的唯一交换是在 input 上,它需要以与格式化输出完全相同的方式分发更改(这是非常快的操作)。
然后,此交换保留在fullouter
连接步骤中,然后该连接将利用此交换并快速运行600个任务。最后,我们通过在以前相同的列上显式存储到相同数量的存储桶中,来保持输出的格式。
注意:通过这种方法,每个存储桶中的文件大小会随着时间的推移而增长,而无需考虑增加存储桶数以保持文件大小的需要。最终,您将通过这种技术达到一个阈值,即文件大小超过128MB,并且您将不再高效执行(解决方案是提高BUCKET_COUNT
值)。
您的输出现在将如下所示:
- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: BUCKET_COUNT by PRIMARY_KEY
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT