Dataproc 未使用 pyspark 并行处理大数据

问题描述

我在 GCP 中启动了一个 DataProc 集群,有一个主节点和 3 个工作节点。每个节点有8个vcpu和30G内存。

我开发了一个 pyspark 代码,它从 GCS 读取一个 csv 文件。 csv文件大小约30G。

df_raw = (
    spark
        .read
        .schema(schema)
        .option('header','true')
        .option('quote','"')
        .option('multiline','true')
        .csv(infile)
)
df_raw = df_raw.repartition(20,"Product")
print(df_raw.rdd.getNumPartitions())

以下是我将 pyspark 启动到 dataproc 的方法

gcloud dataproc jobs submit pyspark gs://<my-gcs-bucket>/<my-program>.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \

我得到的分区号只有 1。

在这里附上了节点使用图片供您参考。

enter image description here

似乎它只使用了来自一个工作节点的一个 vCore。

如何与多个分区并行并使用所有节点和更多 vCore?

尝试重新分区到20,但它仍然只使用了一个工作节点的一个vCore,如下:

enter image description here

Pyspark 认分区是 200。所以我很惊讶地看到 dataproc 没有将所有可用资源用于此类任务。

解决方法

这不是 dataproc 问题,而是纯粹的 Spark/pyspark 问题。

为了并行化您的数据,它需要拆分为多个分区 - 该数量大于您拥有的执行程序(总工作内核)数量。 (例如~*2、~*3、...)

有多种方法可以做到这一点,例如:

  1. 将数据拆分到文件或文件夹中,并并行化文件/文件夹列表并处理每个文件/文件夹(或使用已执行此操作的数据库并将此分区保留在 Spark 读取中)。

  2. 在获得 Spark DF 后重新分区您的数据,例如读取执行程序的数量并将它们乘以 N 并重新分区到这么多分区。执行此操作时,您必须选择能够很好地划分数据的列,即分成许多部分,而不是仅分成几个部分,例如按天,按客户 ID,而不是按状态 ID。

df = df.repartition(num_partitions,'partition_by_col1','partition_by_col2')

代码在主节点上运行,并行阶段分布在工作节点之间,例如

df = (
    df.withColumn(...).select(...)...
    .write(...)
)

由于 Spark 函数是惰性的,因此它们仅在您到达像 write 或 collect 这样导致对 DF 求值的步骤时才会运行。

,

您可能想尝试通过 Dataproc 命令行的 --properties 传递 Spark 配置来增加执行程序的数量。所以像

SpannableStringBuilder ssb = new SpannableStringBuilder();
ssb.setSpan(new ImageSpan(image),ssb.length() - 3,ssb.length(),Spannable.SPAN_EXCLUSIVE_EXCLUSIVE);
itemViewHolder.kuranArabic.setText(ssb);