问题描述
我想使用Azure Data Bricks中的sql将Spark表的结果插入到新的sql Synapse表中。
我尝试了以下解释[https://docs.microsoft.com/zh-cn/azure/databricks/spark/latest/spark-sql/language-manual/sql-ref-Syntax-ddl-create-表数据源],但我没有运气。
必须根据 SELECT 语句创建Synapse表。源应该是Spark / Data Bricks临时视图或Parquet源。
例如临时表
# Load Taxi Location Data from Azure Synapse Analytics
jdbcUrl = "jdbc:sqlserver://synapsesqldbexample.database.windows.net:number;
database=SynapseDW" #Replace "suffix" with your own
connectionProperties = {
"user" : "usernmae1","password" : "password2","driver" : "com.microsoft.sqlserver.jdbc.sqlServerDriver"
}
pushdown_query = '(select * from NYC.TaxiLocationLookup) as t'
dfLookupLocation = spark.read.jdbc(url=jdbcUrl,table=pushdown_query,properties=connectionProperties)
dfLookupLocation.createOrReplaceTempView('NYCTaxiLocation')
display(dfLookupLocation)
例如 Source Synapse DW
服务器: synapsesqldbexample.database.windows.net
数据库: [SynapseDW]
模式: [NYC]
表: [TaxiLocationLookup]
接收器/目标表(尚不存在):
服务器: synapsesqldbexample.database.windows.net
数据库: [SynapseDW]
模式: [NYC]
新表:[TEST_NYCTaxiData]
我尝试的sql语句:
%sql
CREATE TABLE if not exists TEST_NYCTaxiLocation
select *
from NYCTaxiLocation
limit 100
解决方法
如果使用com.databricks.spark.sqldw驱动程序,则将需要一个Azure存储帐户和一个已设置的容器。一旦到位,实际上很容易做到这一点。
-
在Azure Databricks中配置BLOB凭据,我使用in Notebook方法
-
创建JDBC连接字符串和BLOB
-
将您的SELECT语句读入RDD / Dataframe和RDD / Dataframe
-
使用.write函数将数据框向下推送到Azure Synapse
配置BLOB凭据
spark.conf.set( “ fs.azure.account.key..blob.core.windows.net”, “”)
配置JDBC和BLOB路径
jdbc =“ jdbc:sqlserver://.database.windows.net:1433; database =; user = @; password =; encrypt = true; trustServerCertificate = false; hostNameInCertificate = *。database.windows.net; loginTimeout = 30;“ blob =“ wasbs://@.blob.core.windows.net/”
将数据从突触读入数据帧
df = spark.read
.format(“ com.databricks.spark.sqldw”)
.option(“ url”,jdbc)
.option(“ tempDir”,blob)
.option(“ forwardSparkAzureStorageCredentials”,“ true”)
.option(“ Query”,“ SELECT TOP 1000 * FROM ORDER BY NEWID()”)
.load()将数据帧中的数据写回到天青突触
df.write
.format(“ com.databricks.spark.sqldw”)
.option(“ url”,jdbc)
.option(“ forwardSparkAzureStorageCredentials”,“ true”)
.option(“ dbTable”,“ YOURTABLENAME”)
.option(“ tempDir”,blob)
.mode(“ overwrite”)
.save()
@JPVoogt解决方案之外的另一个选项是,在存储帐户中创建了实木复合地板文件后,在Synapse池中使用CTAS。您可以执行复制命令或外部表。
一些参考文献:
https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/develop-tables-cetas https://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/quickstart-bulk-load-copy-tsql