问题描述
我正在读取一个广泛嵌套的avro数据集,其中包含从pyspark开始的基于纱线的spark应用程序。我的目标是重新分区并以镶木地板文件形式写入,但是我经常会因为内存不足而出现GC错误。
我正在通过包装库执行spark-submit
,该包装库隐藏了详细信息,但我可以共享我的spark配置,pyspark代码和执行程序摘要。执行驱动程序时,此配置将传递到spark-submit
:
从pm.spark.sparkContext.getConf().getAll()
推断出我的spark配置(我不确定如何折叠它)
spark.eventLog.enabled = true
spark.network.crypto.enabled = true
spark.sql.queryExecutionListeners = com.cloudera.spark.lineage.NavigatorQueryListener
spark.authenticate.sasl.encryption.aes.enabled = true
spark.authenticate.enableSaslEncryption = true
spark.ui.proxyBase = / proxy / application_1598440892293_34498
spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.driver.host = xxx.xxxx.com
spark.sql.hive.metastore.jars = $ {env:HADOOP_COMMON_HOME} /../ hive / lib / :$ {env:HADOOP_COMMON_HOME} / client /
spark.dynamicAllocation.maxExecutors = 30
spark.executorEnv.PYTHONPATH = / opt / cloudera / parcels / SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012 / lib / spark2 / python / lib / py4j-0.10.7-src。 zip:/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python /:/ opt / cloudera / parcels / SPARK2 / lib / spark2 / python / opt /cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/py4j-0.10.7-src.zip/opt/cloudera/parcels/SPARK2-2.4 .0.cloudera2-1.cdh5.13.3.p0.1041012 / lib / spark2 / python / lib / pyspark.zip
spark.ui.filters = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.driver.memory = 30g
spark.driver.extraLibraryPath = / opt / cloudera / parcels / CDH-5.16.2-1.cdh5.16.2.p0.8 / lib / hadoop / lib / native
spark.driver.memoryOverhead = 9g
spark.ui.enabled = true
spark.driver.appUIAddress = http://xxxx.xxxx.com:4045
spark.executor.id =驱动程序
spark.dynamicAllocation.schedulerBacklogTimeout = 1
spark.app.id = application_1598440892293_34498
spark.yarn.jars =本地:/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/jars / *
spark.app.name = PySparkShell
spark.sql.hive.metastore.version = 1.1.0
spark.yarn.config.gatewayPath = / opt / cloudera / parcels
spark.extraListeners = com.cloudera.spark.lineage.NavigatorAppListener
spark.shuffle.encryption.enabled = true
spark.sql.warehouse.dir = / user / hive / warehouse
spark.sql.catalogImplementation = hive
spark.driver.extraClasspath = / apps / oracle / product / 12.1.0.2 / jdbc / lib / ojdbc7.jar:/path/to/spark-avro_2.11-2.4.0.jar
spark.yarn.config.replacementPath = {{HADOOP_COMMON_HOME}} /../../ ..
spark.yarn.historyServer.address = http://xxxx.xxxx.com:18089
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.RM_HA_URLS = xxxx.xxxx.com:8090,xxxx.xxxx.com:8090
spark.authenticate = true
spark.port.maxRetries = 1000
spark.lineage.log.dir = / hadooplog / spark2 / lineage
spark.eventLog.dir = hdfs:// nameservice1 / user / spark / spark2ApplicationHistory
spark.sql.files.maxPartitionBytes = 67108864
spark.ui.killEnabled = true
spark.yarn.secondary.jars = ojdbc7.jar
spark.executor.cores = 4
spark.dynamicAllocation.executorIdleTimeout = 60
spark.yarn.am.extraLibraryPath = / opt / cloudera / parcels / CDH-5.16.2-1.cdh5.16.2.p0.8 / lib / hadoop / lib / native
spark.io.encryption.enabled = true
spark.dynamicAllocation.initialExecutors = 3
spark.serializer.objectStreamReset = 100
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES = https://lxe1731.allstate.com:8090 / proxy / application_1598440892293_34498,https://xxxx.xxxx.com :8090 / proxy / application_1598440892293_34498
spark.submit.deployMode = client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS = xxxx.xxxx.com,xxxx.xxxx.com
spark.io.compression.codec = snappy
spark.shuffle.service.enabled = true
spark.executor.memory = 10g
spark.repl.local.jars = file:///apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar
spark.driver.port = 41668
spark.executor.extraLibraryPath = / opt / cloudera / parcels / CDH-5.16.2-1.cdh5.16.2.p0.8 / lib / hadoop / lib / native
spark.shuffle.service.port = 7337
spark.unsafe.sorter.spill.read.ahead.enabled = false
spark.lineage.enabled = true
spark.master =纱线
spark.debug.maxToStringFields = 10000
spark.rdd.compress =真
spark.dynamicAllocation.minexecutors = 0
spark.yarn.isPython = true
spark.dynamicAllocation.enabled = true
spark.executor.extraClasspath = / apps / oracle / product / 12.1.0.2 / jdbc / lib / ojdbc7.jar:/home/rmchh/jars/spark-avro_2.11-2.4.0.jar
spark.ui.showConsoleProgress = true
spark.yarn.dist.jars = file:///apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar
Pyspark驱动程序代码要点:
from SparkWrapper import PySparkManager
import pyspark.sql.functions as F
from pyspark.sql.types import StructType,StructField
# Create StructType schema from json file:
with open('schema/pmdm_final_schema.json') as f:
json_schema = json.load(f)
schema = StructType.fromJson(json_schema)
pm = PySparkManager()
spark = pm.spark
path_to_partition = "/data/res/warehouse/mydata/partition_3/APL_00000.avro"
df = spark.read.format("avro").schema(schema).load(path_to_partition).limit(10)
df.cache()
# Repartition and write as parquet
df.repartition(15).write.mode("overwrite").parquet("/data/res/warehouse/mydata/as_parquet")
不幸的是,这种重新分区和写入从未成功。看起来只创建了一项任务。我不确定如何确保创建多个任务来利用我的执行器和内存。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)