如何使用Spark Temp表在Databricks中使用select查询将数据插入表

问题描述

我想使用Azure Data Bricks中的sql将Spark表的结果插入到新的sql Synapse表中。

我尝试了以下解释[https://docs.microsoft.com/zh-cn/azure/databricks/spark/latest/spark-sql/language-manual/sql-ref-Syntax-ddl-create-表数据源],但我没有运气。

必须根据 SELECT 语句创建Synapse表。源应该是Spark / Data Bricks临时视图或Parquet源。

例如临时表

    # Load Taxi Location Data from Azure Synapse Analytics
        
        jdbcUrl = "jdbc:sqlserver://synapsesqldbexample.database.windows.net:number;
database=SynapseDW" #Replace "suffix" with your own  
        connectionProperties = {
          "user" : "usernmae1","password" : "password2","driver" : "com.microsoft.sqlserver.jdbc.sqlServerDriver"
        }
        
        pushdown_query = '(select * from NYC.TaxiLocationLookup) as t'
        dfLookupLocation = spark.read.jdbc(url=jdbcUrl,table=pushdown_query,properties=connectionProperties)
        
        dfLookupLocation.createOrReplaceTempView('NYCTaxiLocation')
        
        display(dfLookupLocation)

例如 Source Synapse DW

服务器: synapsesqldbexample.database.windows.net

数据库 [SynapseDW]

模式: [NYC]

表: [TaxiLocationLookup]

接收器/目标表(尚不存在):

服务器: synapsesqldbexample.database.windows.net

数据库 [SynapseDW]

模式: [NYC]

新表:[TEST_NYCTaxiData]

我尝试的

sql语句:

%sql
CREATE TABLE if not exists TEST_NYCTaxiLocation 
select *
from NYCTaxiLocation
limit 100

解决方法

如果使用com.databricks.spark.sqldw驱动程序,则将需要一个Azure存储帐户和一个已设置的容器。一旦到位,实际上很容易做到这一点。

  1. 在Azure Databricks中配置BLOB凭据,我使用in Notebook方法

  2. 创建JDBC连接字符串和BLOB

  3. 将您的SELECT语句读入RDD / Dataframe和RDD / Dataframe

  4. 使用.write函数将数据框向下推送到Azure Synapse

    配置BLOB凭据

    spark.conf.set( “ fs.azure.account.key..blob.core.windows.net”, “”)

    配置JDBC和BLOB路径

    jdbc =“ jdbc:sqlserver://.database.windows.net:1433; database =; user = @; password =; encrypt = true; trustServerCertificate = false; hostNameInCertificate = *。database.windows.net; loginTimeout = 30;“ blob =“ wasbs://@.blob.core.windows.net/”

    将数据从突触读入数据帧

    df = spark.read
    .format(“ com.databricks.spark.sqldw”)
    .option(“ url”,jdbc)
    .option(“ tempDir”,blob)
    .option(“ forwardSparkAzureStorageCredentials”,“ true”)
    .option(“ Query”,“ SELECT TOP 1000 * FROM ORDER BY NEWID()”)
    .load()

    将数据帧中的数据写回到天青突触

    df.write
    .format(“ com.databricks.spark.sqldw”)
    .option(“ url”,jdbc)
    .option(“ forwardSparkAzureStorageCredentials”,“ true”)
    .option(“ dbTable”,“ YOURTABLENAME”)
    .option(“ tempDir”,blob)
    .mode(“ overwrite”)
    .save()

,

@JPVoogt解决方案之外的另一个选项是,在存储帐户中创建了实木复合地板文件后,在Synapse池中使用CTAS。您可以执行复制命令或外部表。

一些参考文献:

https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/develop-tables-cetas https://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/quickstart-bulk-load-copy-tsql