在 Azure Synapse Analytics 中使用 pyspark 如何创建多个笔记本可以使用的会话

问题描述

我正在 Azure Synapse 中创建数据管道。

基本流程:

获取一些 837 EDI 数据的 CSV 文件。将这些数据文件放在 Azure Data Lake (Gen2) 上。 Foreach 文件将数据放入 Spark DB 中的表格数据库表格式,命名为 claim。查看我的流程

my flow

我的问题:运行时间长。似乎每个文件都必须创建一个新的 Spark 会话,开销太大(每个 3 分钟)。我想通过 appName“声明”一个会话并在整个过程中使用它。我有 3 个测试文件,其中 10 行,另一行 2 行,第三行 10 行。 22 行总时间 12 分钟。

在我的流程中,您可以看到每个 Foreach 循环都有 2 个活动(一个是笔记本,另一个是 sproc),具体取决于它是 837i 还是 837p。

我的笔记本代码

'''python
import re
from pyspark.sql.functions import desc,row_number,monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# create Spark session with necessary configuration    
spark = (SparkSession
    .builder
    .appName("837App")
    .config("spark.network.timeout","600s")
    .config("spark.executor.heartbeatInterval","10s")
    .getorCreate());

# prepping the variables for the source FileName    
srcPath = "abfss://folder@server.dfs.core.windows.net";
srcFQFN = f"{srcPath}/{srcFilesDirectory}/{srcFileName}";   
dstTableName = "raw_837i";

# read Flat file into a data frame
df = spark.read.load(f"{srcFQFN}",format = 'csv',delimiter = f"{srcFileDelimeter}",header = True
);

# add autoid    
adf = df.withColumn('AutoID',row_number().over(Window.orderBy(monotonically_increasing_id())));

# clean  up column names    
adf = adf.toDF(*(re.sub(r"[\.\s\(\)\-]+","_",c) for c in adf.columns));

# Now the Spark database side...
# create the destination database if it did not exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {sparkdbname}");

# write that dataframe to a Spark table. update mode from overwrite to append if we just want to insert    
adf.write.mode("overwrite").saveAsTable(f"{sparkdbname}.{dstTableName}");

感谢@Sequinex 和@Bendemann

我尝试过的:

在管道的开头添加一个笔记本来设置会话;请参阅在我的流程中设置 837 env。目的是,如果具有该 appName 的会话不存在,它将创建它,然后再使用它。这样,我将 3 分钟的启动时间花在管道前端而不是每个文件上。

'''python
from pyspark.sql import SparkSession

# create Spark session with necessary configuration

spark = (SparkSession
    .builder
    .appName("837App")
    .config("spark.network.timeout","10s")
    .getorCreate());

sc = spark.sparkContext;

我无法证明它确实在使用这个 appName(所以如果有人也可以提供帮助)。

我试过了:

'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("837App").getorCreate()
sc

结果:

<SparkContext master=yarn appName=Synapse_sparkPrimary_1618935780>'

不应该是 appName=837App 吗?

我还尝试停止现有会话并启动我的会话

'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
sc.stop()
spark = SparkSession.builder.appName("837App").getorCreate()
sc

但我收到以下错误

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
    at scala.concurrent.Promise$class.success(Promise.scala:86)
    at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextinitialized(ApplicationMaster.scala:392)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextinitialized(ApplicationMaster.scala:808)
    at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):

  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py",line 173,in getorCreate
    sc = SparkContext.getorCreate(sparkConf)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 367,in getorCreate
    SparkContext(conf=conf or SparkConf())

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 136,in __init__
    conf,jsc,profiler_cls)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 198,in _do_init
    self._jsc = jsc or self._initialize_context(self._conf._jconf)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py",line 306,in _initialize_context
    return self._jvm.JavaSparkContext(jconf)

  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1525,in __call__
    answer,self._gateway_client,None,self._fqn)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",line 69,in deco
    return f(*a,**kw)

  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
    format(target_id,".",name),value)

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
    at scala.concurrent.Promise$class.success(Promise.scala:86)
    at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextinitialized(ApplicationMaster.scala:392)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextinitialized(ApplicationMaster.scala:808)
    at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

TIA

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)