问题描述
我需要使用经典的扁平表在BigQuery中执行合并语句,将包含嵌套和重复字段的表作为目标,而我在理解其工作方式方面遇到了麻烦。 Google的示例使用直接值,因此我的语法不太清楚。
使用此示例:
CREATE OR REPLACE TABLE
mydataset.DIM_PERSONA (
IdPersona STRING,Status STRING,Properties ARRAY<STRUCT<
Id STRING,Value STRING,_loadingDate TIMESTAMP,_lastModifiedDate TIMESTAMP
>>,_loadingDate TIMESTAMP NOT NULL,_lastModifiedDate TIMESTAMP
);
INSERT INTO mydataset.DIM_PERSONA
values
('A','KO',[('FamilyMembers','2',CURRENT_TIMESTAMP(),TIMESTAMP(NULL))],TIMESTAMP(NULL)),('B','4',('Pets','1',NULL)],TIMESTAMP(NULL))
;
CREATE OR REPLACE TABLE
mydataset.PERSONA (
IdPersona STRING,IdProperty STRING,Value STRING
);
INSERT INTO mydataset.PERSONA
VALUES('A','OK','Pets','3'),'FamilyMembers','5'),('C','2')
目标是:
此INSERT起作用:
MERGE INTO mydataset.DIM_PERSONA TRG
USING (
SELECT
IdPersona,Status,ARRAY(
SELECT AS STRUCT
IdProperty,Value,TIMESTAMP(NULL)
) Properties,TIMESTAMP(NULL)
FROM mydataset.PERSONA
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (IdPersona,Properties,TIMESTAMP(NULL))
但是我想在INSERT子句中构建嵌套/重复字段,因为对于UPDATE,我还需要(我认为)通过比较TRG和SRC的值来执行“ SELECT AS STRUCT * REPLACE”。 这不起作用:
MERGE INTO mydataset.DIM_PERSONA TRG
USING (
SELECT
*
FROM mydataset.PERSONA
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (
IdPersona,ARRAY(
SELECT AS STRUCT
IdProperty,TIMESTAMP(NULL)
),TIMESTAMP(NULL)
)
即使使用第一个选项,我也无法在UPDATE中引用TRG.properties。
WHEN MATCHED THEN
UPDATE
SET Properties = ARRAY(
SELECT AS STRUCT p_SRC.*
REPLACE (IF(p_SRC.IdProperty=p_TRG.id AND p_SRC.Value<>p_TRG.Value,p_SRC.Value,p_TRG.Value) AS Value)
FROM SRC.Properties p_SRC,TRG.Properties p_TRG
)
虽然这是错误的。
解决这个问题的一种方法,如我所见,是预先联接USING子句中的所有内容,因此在此进行所有替换,但是对于merge语句来说,这是非常错误的。
有人可以帮我解决这个问题吗? :\
解决方法
因此,尽管我仍然希望有另一种方法,但我想分享一个可能的解决方案。 如前所述,我使用CTE和FULL OUTER JOIN预先计算了所需的内容,因此重新创建了以后需要的结构数组(表会相对较小,因此我可以负担得起)。
MERGE INTO mydataset.DIM_PERSONA TRG
USING (
WITH NEW_PROPERTIES AS (
SELECT
COALESCE(idp,IdPersona) IdPersona,ARRAY_AGG((
SELECT AS STRUCT
COALESCE(idpro,Id) IdProperty,COALESCE(vl,Value) Value,COALESCE(_loadingDate,CURRENT_TIMESTAMP) _loadingDate,IF(idp=IdPersona,CURRENT_TIMESTAMP,TIMESTAMP(NULL)) _lastModifiedDate
)) Properties
FROM (
SELECT DIP.IdPersona,DIP.Status,DIP_PR.*,PER.IdPersona idp,PER.Status st,PER.IdProperty idpro,PER.Value vl
FROM `clean-yew-281811.mydataset.DIM_PERSONA` DIP
CROSS JOIN UNNEST(DIP.Properties) DIP_PR
FULL OUTER JOIN mydataset.PERSONA PER
ON DIP.IdPersona=PER.IdPersona
AND DIP_PR.Id=PER.IdProperty
)
GROUP BY IdPersona
)
SELECT
IdPersona,'subquery to do here' Status,NP.Properties
FROM (SELECT DISTINCT IdPersona FROM mydataset.PERSONA) PE
LEFT JOIN NEW_PROPERTIES NP USING (IdPersona)
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (IdPersona,Status,Properties,CURRENT_TIMESTAMP(),TIMESTAMP(NULL))
WHEN MATCHED THEN
UPDATE
SET
TRG.Status = SRC.Status,TRG.Properties = SRC.Properties,TRG._lastModifiedDate = CURRENT_TIMESTAMP()
这可以工作,但是我几乎避免使用语法来更新结构数组,因为我正在做的是重建和替换操作。希望有人可以提出更好的方法。
,此外,虽然您没有提供所需的输出,但我仍然能够根据您描述的目标和您的代码以及您提供的示例数据来创建查询。
遵循以下目标:
- 更新IdPersona ='A',在“属性”中添加新元素并更改状态
- 更新IdPersona ='B',更新属性中的现有元素
- 插入IdPersona ='C'
我没有执行替换和重建操作,而是使用了:
- MERGE;以执行更新并插入新行,例如IdPersona =“ C”
- INSERT:在合并中,无法将INSERT与WHEN MATCHED一起使用。因此,为了在IdPerson =“ A”时添加新的属性,在MERGE操作之后使用了此方法。
- CREATE TABLE:使用INSERT后,由于我们未使用WHEN MATCHED,因此未聚合IdPersona =“ A”时的新属性。因此,将最终表DM_PERSONA替换为正确汇总结果。
- LEFT JOIN:为了添加未汇总到 ARRAY
> 中的字段 _loadingDate 和* _lastModifiedDate *。
下面是带有适当注释的查询:
#first step update current values and insert new IdPersonas
MERGE sample.DIM_PERSONA_test2 T
USING sample.PERSONA_test2 S
ON T.IdPersona = S.IdPersona
#update A but not insert
WHEN MATCHED AND T.IdPersona ="A" THEN
UPDATE SET STATUS = "OK"
#update B
WHEN MATCHED AND T.IdPersona ="B" THEN
UPDATE SET Properties = [( S.IdPersona,S.IdProperty,TIMESTAMP(NULL),TIMESTAMP(NULL) )]
#insert what is not in the target table
WHEN NOT MATCHED THEN
INSERT(IdPersona,_loadingDate,_lastModifiedDate ) VALUES (S.IdPersona,S.Status,[( IdProperty,Value,TIMESTAMP(NULL))],TIMESTAMP(NULL));
#insert new values when IdPersona="A"
#you will see the result won't be aggregated properly
INSERT INTO sample.DIM_PERSONA_test2(IdPersona,_lastModifiedDate)
SELECT IdPersona,TIMESTAMP(NULL) from sample.PERSONA_test2
where IdPersona = "A";
#replace the above table to recriate the ARRAY<STRUCT<>>
CREATE OR REPLACE TABLE sample.DIM_PERSONA_FINAL_test2 AS(
SELECT t1.*,t2._loadingDate,t2._lastModifiedDate
FROM( SELECT a.IdPersona,a.Status,ARRAY_AGG(STRUCT( Properties.Id as Id,Properties.Value as Value,Properties._loadingDate,Properties._lastModifiedDate AS _lastModifiedDate)) AS Properties
FROM sample.DIM_PERSONA_test2 a,UNNEST(Properties) as Properties
GROUP BY 1,2
ORDER BY a.IdPersona)t1 LEFT JOIN sample.DIM_PERSONA_test2 t2 USING(IdPersona)
)
输出,
请注意,在更新 ARRAY