Databricks - 写入 Azure Synapse 时出错

问题描述

我正在尝试使用以下代码将数据写入带有标识字段的 Azure Synapse 表

数据块上的代码

def get_jdbc_connection(host,sqlDatabase,user,password):
  jdbcHostname = "{}.database.windows.net".format(host)
  jdbc_url = "jdbc:sqlserver://{}:1433;database={};user={}@{};password={};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;".format(jdbcHostname,host,password)
  url = "jdbc:sqlserver://{}:1433;database={};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;".format(jdbcHostname,sqlDatabase)
  return (jdbc_url,url )

def write_adw(spark,df_target_adw,jdbc_url,table,tempDir,option_mode,pre_Actions ):
    df_target_adw.write.format("com.databricks.spark.sqldw") \
        .option("url",jdbc_url) \
        .option("useAzureMSI","true") \
        .option("preActions",pre_Actions) \
        .option("dbTable",table) \
        .option("tempDir",tempDir) \
        .mode(option_mode) \
        .save()
    
dftraffic = spark.sql('SELECT distinct SourceName\,1 AS IsActiveRow \,"Pipe-123" as pipelineId \,current_timestamp as ADFCreatedDateTime \,current_timestamp as ADFModifiedDateTime \
              from deltaTable')

#write to ADW
(jdbc_url,url_adw) = get_jdbc_connection(host,sqlDatawarehouse,password)
target_table = 'TargetTable_name'
option_mode= "append"
pre_Actions= " SELECT GETDATE()"
write_adw(spark,dftraffic,target_table,pre_Actions )

adw 上目标表的架构

列名 数据类型
SourceSID INT IDENTITY (1,1) 非空
名称 VARCHAR(20) 非空
IsRowActive 位非空
管道 ID VARCHAR(20) 非空
ADFCreatedDateTime 日期时间不能为空
ADFModifiedDateTime 日期时间不能为空

有关数据块的配置详细信息

Databricks 运行时 7.4(包括 Apache Spark 3.0.1、Scala 2.12)

错误信息

Py4JJavaError:调用 o457.save 时发生错误。 :com.databricks.spark.sqldw.sqlDWSideException:Azure Synapse Analytics 无法执行连接器生成的 JDBC 查询。 基础 sqlException(s): - com.microsoft.sqlserver.jdbc.sqlServerException: 表中标识列的显式值只能在使用列列表且 IDENTITY_INSERT 为 ON 时指定

代码在 databricks 运行时 6.4 Spark 2.4.5 上运行良好,当我尝试升级 dbk 运行时时就遇到了这个错误。 我怎样才能让它工作?

解决方法

您是否没有额外的行“1 AS IsActiveRow”。我在架构中没有看到

dftraffic = spark.sql('SELECT distinct SourceName\,1 AS IsActiveRow \,"Pipe-123" as pipelineId \,current_timestamp as ADFCreatedDateTime \,current_timestamp as ADFModifiedDateTime \
              from deltaTable)