尽管执行者更多,Pyspark无法启动更多任务

问题描述

我正在读取一个广泛嵌套的avro数据集,其中包含从pyspark开始的基于纱线的spark应用程序。我的目标是重新分区并以镶木地板文件形式写入,但是我经常会因为内存不足而出现GC错误

我正在通过包装库执行spark-submit,该包装库隐藏了详细信息,但我可以共享我的spark配置,pyspark代码和执行程序摘要。执行驱动程序时,此配置将传递到spark-submit

pm.spark.sparkContext.getConf().getAll()推断出我的spark配置(我不确定如何折叠它)

spark.eventLog.enabled = true

spark.network.crypto.enabled = true

spark.sql.queryExecutionListeners = com.cloudera.spark.lineage.NavigatorQueryListener

spark.authenticate.sasl.encryption.aes.enabled = true

spark.authenticate.enableSaslEncryption = true

spark.ui.proxyBase = / proxy / application_1598440892293_34498

spark.serializer = org.apache.spark.serializer.KryoSerializer

spark.driver.host = xxx.xxxx.com

spark.sql.hive.metastore.jars = $ {env:HADOOP_COMMON_HOME} /../ hive / lib / :$ {env:HADOOP_COMMON_HOME} / client /

spark.dynamicAllocation.maxExecutors = 30

spark.executorEnv.PYTHONPATH = / opt / cloudera / parcels / SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012 / lib / spark2 / python / lib / py4j-0.10.7-src。 zip:/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python /:/ opt / cloudera / parcels / SPARK2 / lib / spark2 / python / opt /cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/py4j-0.10.7-src.zip/opt/cloudera/parcels/SPARK2-2.4 .0.cloudera2-1.cdh5.13.3.p0.1041012 / lib / spark2 / python / lib / pyspark.zip

spark.ui.filters = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

spark.driver.memory = 30g

spark.driver.extraLibraryPath = / opt / cloudera / parcels / CDH-5.16.2-1.cdh5.16.2.p0.8 / lib / hadoop / lib / native

spark.driver.memoryOverhead = 9g

spark.ui.enabled = true

spark.driver.appUIAddress = http://xxxx.xxxx.com:4045

spark.executor.id =驱动程序

spark.dynamicAllocation.schedulerBacklogTimeout = 1

spark.app.id = application_1598440892293_34498

spark.yarn.jars =本地:/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/jars / *

spark.app.name = PySparkShell

spark.sql.hive.metastore.version = 1.1.0

spark.yarn.config.gatewayPath = / opt / cloudera / parcels

spark.extraListeners = com.cloudera.spark.lineage.NavigatorAppListener

spark.shuffle.encryption.enabled = true

spark.sql.warehouse.dir = / user / hive / warehouse

spark.sql.catalogImplementation = hive

spark.driver.extraClasspath = / apps / oracle / product / 12.1.0.2 / jdbc / lib / ojdbc7.jar:/path/to/spark-avro_2.11-2.4.0.jar

spark.yarn.config.replacementPath = {{HADOOP_COMMON_HOME}} /../../ ..

spark.yarn.historyServer.address = http://xxxx.xxxx.com:18089

spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.RM_HA_URLS = xxxx.xxxx.com:8090,xxxx.xxxx.com:8090

spark.authenticate = true

spark.port.maxRetries = 1000

spark.lineage.log.dir = / hadooplog / spark2 / lineage

spark.eventLog.dir = hdfs:// nameservice1 / user / spark / spark2ApplicationHistory

spark.sql.files.maxPartitionBytes = 67108864

spark.ui.killEnabled = true

spark.yarn.secondary.jars = ojdbc7.jar

spark.executor.cores = 4

spark.dynamicAllocation.executorIdleTimeout = 60

spark.yarn.am.extraLibraryPath = / opt / cloudera / parcels / CDH-5.16.2-1.cdh5.16.2.p0.8 / lib / hadoop / lib / native

spark.io.encryption.enabled = true

spark.dynamicAllocation.initialExecutors = 3

spark.serializer.objectStreamReset = 100

spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES = https://lxe1731.allstate.com:8090 / proxy / application_1598440892293_34498,https://xxxx.xxxx.com :8090 / proxy / application_1598440892293_34498

spark.submit.deployMode = client

spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS = xxxx.xxxx.com,xxxx.xxxx.com

spark.io.compression.codec = snappy

spark.shuffle.service.enabled = true

spark.executor.memory = 10g

spark.repl.local.jars = file:///apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar

spark.driver.port = 41668

spark.executor.extraLibraryPath = / opt / cloudera / parcels / CDH-5.16.2-1.cdh5.16.2.p0.8 / lib / hadoop / lib / native

spark.shuffle.service.port = 7337

spark.unsafe.sorter.spill.read.ahead.enabled = false

spark.lineage.enabled = true

spark.master =纱线

spark.debug.maxToStringFields = 10000

spark.rdd.compress =真

spark.dynamicAllocation.minexecutors = 0

spark.yarn.isPython = true

spark.dynamicAllocation.enabled = true

spark.executor.extraClasspath = / apps / oracle / product / 12.1.0.2 / jdbc / lib / ojdbc7.jar:/home/rmchh/jars/spark-avro_2.11-2.4.0.jar

spark.ui.showConsoleProgress = true

spark.yarn.dist.jars = file:///apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar

Pyspark驱动程序代码要点:

from SparkWrapper import PySparkManager
import pyspark.sql.functions as F
from pyspark.sql.types import StructType,StructField

# Create StructType schema from json file:
with open('schema/pmdm_final_schema.json') as f:
    json_schema = json.load(f)
schema = StructType.fromJson(json_schema)

pm = PySparkManager()
spark = pm.spark

path_to_partition = "/data/res/warehouse/mydata/partition_3/APL_00000.avro"
df = spark.read.format("avro").schema(schema).load(path_to_partition).limit(10)

df.cache()

# Repartition and write as parquet
df.repartition(15).write.mode("overwrite").parquet("/data/res/warehouse/mydata/as_parquet")

不幸的是,这种重新分区和写入从未成功。看起来只创建了一项任务。我不确定如何确保创建多个任务来利用我的执行器和内存。

enter image description here

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)