如何使多联接/多联接数据集的计算速度更快?

问题描述

我有一系列约30个数据集,所有这些数据集都需要结合在一起才能构成一个宽泛的决赛桌。最终表需要约5年的单个表(每年一个表)并将它们合并在一起,然后将这个完整的历史记录与其他表(类似地合并的)的全部历史记录结合在一起,以构成一个大的,历史的,宽的表。

每年的第一个表格的布局如下:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |

与其他年份表类似:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 1    |
| key_2       | 1    |

然后将这些结合在一起以创建:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

类似地,合并后的第二种类型的表将导致以下结果:

table_type_2:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

我现在想在table_type_1table_type_2上将primary_keyyear结合起来以产生更大的表格。我注意到最后的连接需要很长的时间,并且会随机整理大量数据。

如何使它更快?

解决方法

您可以在primary_keyyear列的每年表上使用存储桶,将它们存储到完全相同数量的存储桶中,以避免在计算最终联接时进行昂贵的交换。

- output: table_type_1_year_0
  input: raw_table_type_1_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
- output: table_type_1_year_1
  input: raw_table_type_1_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
...
- output: table_type_2_year_0
  input: raw_table_type_2_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
- output: table_type_2_year_1
  input: raw_table_type_2_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)
...
- output: all_tables
  input:
    - table_type_1_year_0
    - table_type_1_year_1
...
    - table_type_2_year_0
    - table_type_2_year_1
...
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY,YEAR)

注意:在选择BUCKET_COUNT值时,重要的是要了解应针对最终的all_tables输出对其进行优化,而对于中间表则。这意味着您最终可能会得到中间表很小的文件。与all_tables输出的效率提高相比,这可能是无关紧要的,因为在合并所有内容时不必计算大量的交换。您的存储桶将被预先计算,您只需在输入文件上SortMergeJoin

对于如何写出指定数量的存储桶的转换的一个明确示例,我对here的回答可能很有用。

,

我的建议是:在小型数据集上进行第一个并集,然后广播该数据集,第一个并集的结果是,spark会将数据集部署在其不同节点上,这将减少混洗次数。 spark上的并集已经过优化,因此您需要考虑的是拥有:从一开始就只选择您需要的列,避免在使用诸如groupByKey ...之类的并集之前进行任何非成本有效的操作,因为spark会在进行最终处理时调用这些操作。我确实建议您避免配置单元,因为它使用的map减少策略与spark sql相比不值得,您可以使用此函数示例来更改键,如果可以,请使用scala直接与spark交互:

Cookies.set("authToken",result.token,{
                        //  secure: true,sameSite: "strict",expires: 1,//  domain: "localhost",});