问题描述
我正在 Azure Synapse 中创建数据管道。
基本流程:
获取一些 837 EDI 数据的 CSV 文件。将这些数据文件放在 Azure Data Lake (Gen2) 上。 Foreach 文件将数据放入 Spark DB 中的表格数据库表格式,命名为 claim。查看我的流程
我的问题:运行时间长。似乎每个文件都必须创建一个新的 Spark 会话,开销太大(每个 3 分钟)。我想通过 appName“声明”一个会话并在整个过程中使用它。我有 3 个测试文件,其中 10 行,另一行 2 行,第三行 10 行。 22 行总时间 12 分钟。
在我的流程中,您可以看到每个 Foreach 循环都有 2 个活动(一个是笔记本,另一个是 sproc),具体取决于它是 837i 还是 837p。
我的笔记本代码:
'''python
import re
from pyspark.sql.functions import desc,row_number,monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
# create Spark session with necessary configuration
spark = (SparkSession
.builder
.appName("837App")
.config("spark.network.timeout","600s")
.config("spark.executor.heartbeatInterval","10s")
.getorCreate());
# prepping the variables for the source FileName
srcPath = "abfss://folder@server.dfs.core.windows.net";
srcFQFN = f"{srcPath}/{srcFilesDirectory}/{srcFileName}";
dstTableName = "raw_837i";
# read Flat file into a data frame
df = spark.read.load(f"{srcFQFN}",format = 'csv',delimiter = f"{srcFileDelimeter}",header = True
);
# add autoid
adf = df.withColumn('AutoID',row_number().over(Window.orderBy(monotonically_increasing_id())));
# clean up column names
adf = adf.toDF(*(re.sub(r"[\.\s\(\)\-]+","_",c) for c in adf.columns));
# Now the Spark database side...
# create the destination database if it did not exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {sparkdbname}");
# write that dataframe to a Spark table. update mode from overwrite to append if we just want to insert
adf.write.mode("overwrite").saveAsTable(f"{sparkdbname}.{dstTableName}");
感谢@Sequinex 和@Bendemann
我尝试过的:
在管道的开头添加了一个笔记本来设置会话;请参阅在我的流程中设置 837 env。目的是,如果具有该 appName 的会话不存在,它将创建它,然后再使用它。这样,我将 3 分钟的启动时间花在管道前端而不是每个文件上。
'''python
from pyspark.sql import SparkSession
# create Spark session with necessary configuration
spark = (SparkSession
.builder
.appName("837App")
.config("spark.network.timeout","10s")
.getorCreate());
sc = spark.sparkContext;
我无法证明它确实在使用这个 appName(所以如果有人也可以提供帮助)。
我试过了:
'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("837App").getorCreate()
sc
结果:
<SparkContext master=yarn appName=Synapse_sparkPrimary_1618935780>'
不应该是 appName=837App 吗?
我还尝试停止现有会话并启动我的会话
'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
sc.stop()
spark = SparkSession.builder.appName("837App").getorCreate()
sc
但我收到以下错误:
Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.Promise$class.success(Promise.scala:86)
at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextinitialized(ApplicationMaster.scala:392)
at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextinitialized(ApplicationMaster.scala:808)
at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py",line 173,in getorCreate
sc = SparkContext.getorCreate(sparkConf)
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 367,in getorCreate
SparkContext(conf=conf or SparkConf())
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 136,in __init__
conf,jsc,profiler_cls)
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 198,in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 306,in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1525,in __call__
answer,self._gateway_client,None,self._fqn)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",line 69,in deco
return f(*a,**kw)
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
format(target_id,".",name),value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.Promise$class.success(Promise.scala:86)
at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextinitialized(ApplicationMaster.scala:392)
at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextinitialized(ApplicationMaster.scala:808)
at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
TIA
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)