问题描述
我在 GCP 中启动了一个 DataProc 集群,有一个主节点和 3 个工作节点。每个节点有8个vcpu和30G内存。
我开发了一个 pyspark 代码,它从 GCS 读取一个 csv 文件。 csv文件大小约30G。
df_raw = (
spark
.read
.schema(schema)
.option('header','true')
.option('quote','"')
.option('multiline','true')
.csv(infile)
)
df_raw = df_raw.repartition(20,"Product")
print(df_raw.rdd.getNumPartitions())
以下是我将 pyspark 启动到 dataproc 的方法:
gcloud dataproc jobs submit pyspark gs://<my-gcs-bucket>/<my-program>.py \
--cluster=${CLUSTER} \
--region=${REGION} \
我得到的分区号只有 1。
如何与多个分区并行并使用所有节点和更多 vCore?
尝试重新分区到20,但它仍然只使用了一个工作节点的一个vCore,如下:
Pyspark 默认分区是 200。所以我很惊讶地看到 dataproc 没有将所有可用资源用于此类任务。
解决方法
这不是 dataproc 问题,而是纯粹的 Spark/pyspark 问题。
为了并行化您的数据,它需要拆分为多个分区 - 该数量大于您拥有的执行程序(总工作内核)数量。 (例如~*2、~*3、...)
有多种方法可以做到这一点,例如:
-
将数据拆分到文件或文件夹中,并并行化文件/文件夹列表并处理每个文件/文件夹(或使用已执行此操作的数据库并将此分区保留在 Spark 读取中)。
-
在获得 Spark DF 后重新分区您的数据,例如读取执行程序的数量并将它们乘以 N 并重新分区到这么多分区。执行此操作时,您必须选择能够很好地划分数据的列,即分成许多部分,而不是仅分成几个部分,例如按天,按客户 ID,而不是按状态 ID。
df = df.repartition(num_partitions,'partition_by_col1','partition_by_col2')
代码在主节点上运行,并行阶段分布在工作节点之间,例如
df = (
df.withColumn(...).select(...)...
.write(...)
)
由于 Spark 函数是惰性的,因此它们仅在您到达像 write 或 collect 这样导致对 DF 求值的步骤时才会运行。
,您可能想尝试通过 Dataproc 命令行的 --properties
传递 Spark 配置来增加执行程序的数量。所以像
SpannableStringBuilder ssb = new SpannableStringBuilder();
ssb.setSpan(new ImageSpan(image),ssb.length() - 3,ssb.length(),Spannable.SPAN_EXCLUSIVE_EXCLUSIVE);
itemViewHolder.kuranArabic.setText(ssb);