问题描述
我有一系列约30个数据集,所有这些数据集都需要结合在一起才能构成一个宽泛的决赛桌。最终表需要约5年的单个表(每年一个表)并将它们合并在一起,然后将这个完整的历史记录与其他表(类似地合并的)的全部历史记录结合在一起,以构成一个大的,历史的,宽的表。
每年的第一个表格的布局如下:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
与其他年份表类似:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 1 |
| key_2 | 1 |
然后将这些结合在一起以创建:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
类似地,合并后的第二种类型的表将导致以下结果:
table_type_2:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
我现在想在table_type_1
和table_type_2
上将primary_key
与year
结合起来以产生更大的表格。我注意到最后的连接需要很长的时间,并且会随机整理大量数据。
如何使它更快?
解决方法
您可以在primary_key
和year
列的每年表上使用存储桶,将它们存储到完全相同数量的存储桶中,以避免在计算最终联接时进行昂贵的交换。
- output: table_type_1_year_0
input: raw_table_type_1_year_0
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
- output: table_type_1_year_1
input: raw_table_type_1_year_1
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
...
- output: table_type_2_year_0
input: raw_table_type_2_year_0
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
- output: table_type_2_year_1
input: raw_table_type_2_year_1
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
...
- output: all_tables
input:
- table_type_1_year_0
- table_type_1_year_1
...
- table_type_2_year_0
- table_type_2_year_1
...
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
注意:在选择BUCKET_COUNT
值时,重要的是要了解应针对最终的all_tables
输出对其进行优化,而对于中间表则不。这意味着您最终可能会得到中间表很小的文件。与all_tables
输出的效率提高相比,这可能是无关紧要的,因为在合并所有内容时不必计算大量的交换。您的存储桶将被预先计算,您只需在输入文件上SortMergeJoin
。
对于如何写出指定数量的存储桶的转换的一个明确示例,我对here的回答可能很有用。
,我的建议是:在小型数据集上进行第一个并集,然后广播该数据集,第一个并集的结果是,spark会将数据集部署在其不同节点上,这将减少混洗次数。 spark上的并集已经过优化,因此您需要考虑的是拥有:从一开始就只选择您需要的列,避免在使用诸如groupByKey ...之类的并集之前进行任何非成本有效的操作,因为spark会在进行最终处理时调用这些操作。我确实建议您避免配置单元,因为它使用的map减少策略与spark sql相比不值得,您可以使用此函数示例来更改键,如果可以,请使用scala直接与spark交互:
Cookies.set("authToken",result.token,{
// secure: true,sameSite: "strict",expires: 1,// domain: "localhost",});