问题描述
这是表结构;
code name under
1 National Sales Manager 1
2 regional sales manager 1
3 area sales manager 2
4 sales manager 3
如何获得如下所示的顶级父层次结构;
code name under ultimateparent
1 National Sales Manager 1 1
2 regional sales manager 1 1
3 area sales manager 2 1
4 sales manager 3 1
在常规 sql 服务器上,我将使用递归 CTE,如 SQL Server function to get top level parent in hierarchy 所示。但是,突触数据库不支持它。
解决方法
您有几个选择。我将在这里描述三个:
- 如果您附近有一个 Azure SQL DB 并且卷不是太大,请使用
CREATE EXTERNAL TABLE
显示该 Azure SQL DB 中的表或简单地使用 Azure 数据工厂 (ADF) 复制数据,执行您的递归 CTE 然后使用 ADF 将其移植回来。在此数据进入您的 SQL 池之前,或者使用某种预处理。 - 递归 CTE 只是一天结束时的一种循环,因此 Synapse 支持
WHILE
。现在很明显,这种类型的循环不能很好地转换为 Synapse,因为它很健谈,但对于具有低层次深度的小体积来说可能是一种选择。由您来检查以这种方式无效地使用 MPP 架构与编写替代方案之间的权衡。
我编写了一个选项 2 的示例,它只运行了几行就花费了 20 多秒。通常我会认为这是不可接受的,但如前所述,由您来权衡替代方案:
IF OBJECT_ID('dbo.someHierarchy') IS NOT NULL
DROP TABLE dbo.someHierarchy;
CREATE TABLE dbo.someHierarchy (
code INT NOT NULL,[name] VARCHAR(50) NOT NULL,under INT NOT NULL
)
WITH
(
DISTRIBUTION = ROUND_ROBIN,HEAP
);
INSERT INTO dbo.someHierarchy ( code,[name],under )
SELECT 1,'National Sales Manager',1
UNION ALL
SELECT 2,'Regional Sales Manager',1
UNION ALL
SELECT 3,'Area Sales Manager',2
UNION ALL
SELECT 4,'Sales Manager',3
INSERT INTO dbo.someHierarchy ( code,under )
SELECT 5,'Lead Bob',5
UNION ALL
SELECT 6,'Main Bob',5
UNION ALL
SELECT 7,'Junior Bob 1',6
UNION ALL
SELECT 8,'Junior Bob 2',6
INSERT INTO dbo.someHierarchy ( code,under )
SELECT 9,'Jim - CEO',9
UNION ALL
SELECT 10,'Tim - CFO',9
UNION ALL
SELECT 11,'Rob - CIO',9
UNION ALL
SELECT 12,'Bob - VP',10
UNION ALL
SELECT 13,'Shon - Director',12
UNION ALL
SELECT 14,'Shane - VP',11
UNION ALL
SELECT 15,'Sheryl - VP',11
UNION ALL
SELECT 16,'Dan - Director',15
UNION ALL
SELECT 17,'Kim - Director',15
UNION ALL
SELECT 18,'Carlo - PM',16
UNION ALL
SELECT 19,'Monty - Sr Dev',18
UNION ALL
SELECT 20,'Chris - Sr Dev',18
IF OBJECT_ID('tempdb..#tmp') IS NOT NULL DROP TABLE #tmp;
CREATE TABLE #tmp (
xlevel INT NOT NULL,code INT NOT NULL,[name] VARCHAR(50) NOT NULL,under INT NOT NULL,ultimateParent INT NOT NULL
);
-- Insert first level; similar to anchor section of CTE
INSERT INTO #tmp ( xlevel,code,under,ultimateParent )
SELECT 1 AS xlevel,under AS ultimateParent
FROM dbo.someHierarchy
WHERE under = code;
-- Loop section
DECLARE @i INT = 1
WHILE EXISTS (
SELECT * FROM dbo.someHierarchy h
WHERE NOT EXISTS ( SELECT * FROM #tmp t WHERE h.code = t.code )
)
BEGIN
-- Insert subsequent levels; similar to recursive section of CTE
INSERT INTO #tmp ( xlevel,ultimateParent )
SELECT t.xlevel + 1,h.code,h.[name],h.under,t.ultimateParent
FROM #tmp t
INNER JOIN dbo.someHierarchy h ON t.code = h.under
WHERE h.under != h.code
AND t.xlevel = @i;
-- Increment counter
SET @i += 1
-- Loop guard
IF @i > 99
BEGIN
RAISERROR( 'Too many loops!',16,1 )
BREAK
END
END
SELECT 'loop' s,*
FROM #tmp
ORDER BY code,xlevel;
结果:
条件是 WHILE EXISTS
循环是一种特别昂贵的方法,所以也许有一种更简单的方法来处理您的数据。
第三种选择是使用 Azure Synapse Notebook 和库(如 GraphFrames)来遍历层次结构。有更简单的方法可以做到这一点,但我发现 Connected Components 方法能够确定最终的管理者。使用 GraphFrames 的一个优点是它允许更复杂的图形查询,例如,如果需要,可以使用motifs。 此笔记本使用的是 Spark (Scala) 版本:
将正确版本的 graphFrames library 上传到 Spark:
%%configure -f
{
"conf": {
"spark.jars": "abfss://{yourContainer}@{yourDataLake}.dfs.core.windows.net/synapse/workspaces/{yourWorkspace}/sparkpools/{yourSparkpool}/libraries/graphframes-0.8.1-spark2.4-s_2.11.jar",}
}
使用大括号为您的环境配置元素。
导入相关库:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
从专用 SQL 池中获取数据并将其分配给数据帧:
// Get a table from Synapse dedicated SQL pool,select / rename certain columns from it to vertices and edge dataframes
val df = spark.read.synapsesql("yourDB.dbo.someHierarchy")
val v = df.selectExpr("code AS id","name AS empName","under")
v.show
// Reformat the code/under relationship from the original table
// NB Exclude because in graph terms these don't have an edge
val e = df.selectExpr("code AS src","under AS dst","'under' AS relationship").where("code != under")
e.show
从顶点和边数据框创建图形框:
// Create the graph frame
val g = GraphFrame(v,e)
print(g)
为 connectedComponents 设置检查点:
// The connected components adds a component id to each 'group'
// Set a checkpoint to start
sc.setCheckpointDir("/tmp/graphframes-azure-synapse-notebook")
对数据运行连通分量算法:
// Run connected components algorithm against the data
val cc = g.connectedComponents.run() // doesn't work on Spark 1.4
display(cc)
加入原始顶点数据帧和连通分量算法的结果,并将其写回 Azure Synapse 专用 SQL 池:
val writeDf = spark.sqlContext.sql ("select v.id,v.empName,v.under,cc.component AS ultimateManager from v inner join cc on v.id = cc.id")
//display(writeDf)
writeDf.write.synapsesql("someDb.dbo.someHierarchy2",Constants.INTERNAL)
结果:
我感觉有一种更简单的方法可以用笔记本来实现这一点,但期待看到一些替代方案。在此处为 Synapse 上的递归 CTE 的反馈项点赞:
,您是否考虑或尝试将数据放入 json 文件并使用 Synapse 数据流为您扁平化层次结构?