问题描述
我正在尝试使用以下代码将数据写入带有标识字段的 Azure Synapse 表
数据块上的代码
def get_jdbc_connection(host,sqlDatabase,user,password):
jdbcHostname = "{}.database.windows.net".format(host)
jdbc_url = "jdbc:sqlserver://{}:1433;database={};user={}@{};password={};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;".format(jdbcHostname,host,password)
url = "jdbc:sqlserver://{}:1433;database={};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;".format(jdbcHostname,sqlDatabase)
return (jdbc_url,url )
def write_adw(spark,df_target_adw,jdbc_url,table,tempDir,option_mode,pre_Actions ):
df_target_adw.write.format("com.databricks.spark.sqldw") \
.option("url",jdbc_url) \
.option("useAzureMSI","true") \
.option("preActions",pre_Actions) \
.option("dbTable",table) \
.option("tempDir",tempDir) \
.mode(option_mode) \
.save()
dftraffic = spark.sql('SELECT distinct SourceName\,1 AS IsActiveRow \,"Pipe-123" as pipelineId \,current_timestamp as ADFCreatedDateTime \,current_timestamp as ADFModifiedDateTime \
from deltaTable')
#write to ADW
(jdbc_url,url_adw) = get_jdbc_connection(host,sqlDatawarehouse,password)
target_table = 'TargetTable_name'
option_mode= "append"
pre_Actions= " SELECT GETDATE()"
write_adw(spark,dftraffic,target_table,pre_Actions )
adw 上目标表的架构
列名 | 数据类型 |
---|---|
SourceSID | INT IDENTITY (1,1) 非空 |
源名称 | VARCHAR(20) 非空 |
IsRowActive | 位非空 |
管道 ID | VARCHAR(20) 非空 |
ADFCreatedDateTime | 日期时间不能为空 |
ADFModifiedDateTime | 日期时间不能为空 |
有关数据块的配置详细信息
Databricks 运行时 7.4(包括 Apache Spark 3.0.1、Scala 2.12)
错误信息
Py4JJavaError:调用 o457.save 时发生错误。 :com.databricks.spark.sqldw.sqlDWSideException:Azure Synapse Analytics 无法执行连接器生成的 JDBC 查询。 基础 sqlException(s): - com.microsoft.sqlserver.jdbc.sqlServerException: 表中标识列的显式值只能在使用列列表且 IDENTITY_INSERT 为 ON 时指定
代码在 databricks 运行时 6.4 Spark 2.4.5 上运行良好,当我尝试升级 dbk 运行时时就遇到了这个错误。 我怎样才能让它工作?
解决方法
您是否没有额外的行“1 AS IsActiveRow”。我在架构中没有看到
dftraffic = spark.sql('SELECT distinct SourceName\,1 AS IsActiveRow \,"Pipe-123" as pipelineId \,current_timestamp as ADFCreatedDateTime \,current_timestamp as ADFModifiedDateTime \
from deltaTable)