问题描述
我正在使用下面的简单代码行从Azure Databricks将数据加载到sql DB
val loadsqldb = spark.sql("""SELECT * FROM TABLEA""")
// WRITE FROM CONfig
val writeConfig = Config(Map(
"url" -> url,"databaseName" -> databaseName,"dbTable" -> "dbo.TABLENAME","user" -> user,"password" -> password,"connectTimeout" -> "5"
))
//~
loadsqldb.write.mode(SaveMode.Overwrite).option("truncate",true).sqlDB(writeConfig)
我们在服务器上必须保留一个唯一的ID密钥,该密钥如下所示:
CREATE UNIQUE INDEX i__NeighbourhoodCategoryHourlyForecast_unique
ON almanac_devdb.dbo.NeighbourhoodCategoryHourlyForecast (fk_neighbourhoods,fk_categories,local_date,local_hour)
GO
Cannot insert duplicate key row in object 'dbo.TABLENAME' with unique index 'i__TABLENAME_unique'. The duplicate key value is (36983,130000,2020-08-12,14).
有人建议我找到某种方法让Databricks对OVERWRITE进行合并,但是我不确定该怎么做,即使那是正确的选择,也不确定吗?
解决方法
我建议遵循以下步骤(不要在spark端进行任何更改,而应在sql server端执行以下步骤)-
- Create view在
target_table
的顶部,您要在其中写入spark数据帧数据 - 创建INSTEAD OF INSERT Trigger的方式应使所有插入命令都应通过在步骤#1中创建的视图进行操作
CREATE TRIGGER <trigger_name>
ON <view_created_in_step_1>
INSTEAD OF INSERT
AS
BEGIN
Merge statment...
END
- 用于插入和更新singe语句的合并语句。 Follow this tutorial.
您可能还希望查看this tutorial,以便与交易相关的查询在键(不匹配)匹配的情况下更新或插入表
,删除唯一索引键的重复项。
df.dropDuplicates(Array("col1","col2"))
然后,尝试写入数据库。