带有NESTED + REPEATED字段的BigQuery MERGE语句

问题描述

我需要使用经典的扁平表在BigQuery中执行合并语句,将包含嵌套和重复字段的表作为目标,而我在理解其工作方式方面遇到了麻烦。 Google的示例使用直接值,因此我的语法不太清楚。

使用此示例:

CREATE OR REPLACE TABLE
  mydataset.DIM_PERSONA (
    IdPersona STRING,Status STRING,Properties ARRAY<STRUCT<
      Id STRING,Value STRING,_loadingDate TIMESTAMP,_lastModifiedDate TIMESTAMP
    >>,_loadingDate TIMESTAMP NOT NULL,_lastModifiedDate TIMESTAMP
);

INSERT INTO mydataset.DIM_PERSONA
values
  ('A','KO',[('FamilyMembers','2',CURRENT_TIMESTAMP(),TIMESTAMP(NULL))],TIMESTAMP(NULL)),('B','4',('Pets','1',NULL)],TIMESTAMP(NULL))
;

CREATE OR REPLACE TABLE
  mydataset.PERSONA (
    IdPersona STRING,IdProperty STRING,Value STRING
);

INSERT INTO mydataset.PERSONA
VALUES('A','OK','Pets','3'),'FamilyMembers','5'),('C','2')

目标是:

  1. 更新IdPersona ='A',在“属性”中添加一个新元素,然后 更改状态
  2. 更新IdPersona ='B',更新现有元素 在属性
  3. 插入IdPersona ='C'

此INSERT起作用:

MERGE INTO mydataset.DIM_PERSONA TRG
USING (
  SELECT
    IdPersona,Status,ARRAY(
      SELECT AS STRUCT
        IdProperty,Value,TIMESTAMP(NULL)
    ) Properties,TIMESTAMP(NULL)
  FROM mydataset.PERSONA
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (IdPersona,Properties,TIMESTAMP(NULL))

但是我想在INSERT子句中构建嵌套/重复字段,因为对于UPDATE,我还需要(我认为)通过比较TRG和SRC的值来执行“ SELECT AS STRUCT * REPLACE”。 这不起作用:

MERGE INTO mydataset.DIM_PERSONA TRG
USING (
  SELECT
    *
  FROM mydataset.PERSONA
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (
  IdPersona,ARRAY(
    SELECT AS STRUCT
      IdProperty,TIMESTAMP(NULL)
  ),TIMESTAMP(NULL)
)

我得到 “ INSERT子句不支持相关子查询。”

即使使用第一个选项,我也无法在UPDATE中引用TRG.properties。

WHEN MATCHED THEN
UPDATE
SET Properties = ARRAY(
  SELECT AS STRUCT p_SRC.*
    REPLACE (IF(p_SRC.IdProperty=p_TRG.id AND p_SRC.Value<>p_TRG.Value,p_SRC.Value,p_TRG.Value) AS Value)
  FROM SRC.Properties p_SRC,TRG.Properties p_TRG
)

虽然这是错误的。

解决这个问题的一种方法,如我所见,是预先联接USING子句中的所有内容,因此在此进行所有替换,但是对于merge语句来说,这是非常错误的。

有人可以帮我解决这个问题吗? :\

解决方法

因此,尽管我仍然希望有另一种方法,但我想分享一个可能的解决方案。 如前所述,我使用CTE和FULL OUTER JOIN预先计算了所需的内容,因此重新创建了以后需要的结构数组(表会相对较小,因此我可以负担得起)。

MERGE INTO mydataset.DIM_PERSONA TRG
USING (
  WITH NEW_PROPERTIES AS (
    SELECT
      COALESCE(idp,IdPersona) IdPersona,ARRAY_AGG((
        SELECT AS STRUCT
          COALESCE(idpro,Id) IdProperty,COALESCE(vl,Value) Value,COALESCE(_loadingDate,CURRENT_TIMESTAMP) _loadingDate,IF(idp=IdPersona,CURRENT_TIMESTAMP,TIMESTAMP(NULL)) _lastModifiedDate
      )) Properties
    FROM (
      SELECT DIP.IdPersona,DIP.Status,DIP_PR.*,PER.IdPersona idp,PER.Status st,PER.IdProperty idpro,PER.Value vl
      FROM `clean-yew-281811.mydataset.DIM_PERSONA` DIP
      CROSS JOIN UNNEST(DIP.Properties) DIP_PR
      FULL OUTER JOIN mydataset.PERSONA PER
        ON  DIP.IdPersona=PER.IdPersona
        AND DIP_PR.Id=PER.IdProperty 
    )
    GROUP BY IdPersona
  )
  
  SELECT
    IdPersona,'subquery to do here' Status,NP.Properties
  FROM (SELECT DISTINCT IdPersona FROM mydataset.PERSONA) PE
  LEFT JOIN NEW_PROPERTIES NP USING (IdPersona)
) SRC ON TRG.IdPersona=SRC.IdPersona
WHEN NOT MATCHED THEN
INSERT VALUES (IdPersona,Status,Properties,CURRENT_TIMESTAMP(),TIMESTAMP(NULL))
WHEN MATCHED THEN
UPDATE
SET
  TRG.Status = SRC.Status,TRG.Properties = SRC.Properties,TRG._lastModifiedDate = CURRENT_TIMESTAMP()

这可以工作,但是我几乎避免使用语法来更新结构数组,因为我正在做的是重建和替换操作。希望有人可以提出更好的方法。

,

此外,虽然您没有提供所需的输出,但我仍然能够根据您描述的目标和您的代码以及您提供的示例数据来创建查询。

遵循以下目标:

  1. 更新IdPersona ='A',在“属性”中添加新元素并更改状态
  2. 更新IdPersona ='B',更新属性中的现有元素
  3. 插入IdPersona ='C'

我没有执行替换和重建操作,而是使用了:

  • MERGE;以执行更新并插入新行,例如IdPersona =“ C”
  • INSERT:在合并中,无法将INSERT与WHEN MATCHED一起使用。因此,为了在IdPerson =“ A”时添加新的属性,在MERGE操作之后使用了此方法。
  • CREATE TABLE:使用INSERT后,由于我们未使用WHEN MATCHED,因此未聚合IdPersona =“ A”时的新属性。因此,将最终表DM_PERSONA替换为正确汇总结果。
  • LEFT JOIN:为了添加未汇总到 ARRAY > 中的字段 _loadingDate 和* _lastModifiedDate *。

下面是带有适当注释的查询:

#first step update current values and insert new IdPersonas
MERGE sample.DIM_PERSONA_test2 T
USING sample.PERSONA_test2 S
ON T.IdPersona = S.IdPersona

#update A but not insert
WHEN MATCHED AND T.IdPersona ="A" THEN
UPDATE SET STATUS = "OK"

#update B
WHEN  MATCHED AND T.IdPersona ="B" THEN
UPDATE SET Properties = [( S.IdPersona,S.IdProperty,TIMESTAMP(NULL),TIMESTAMP(NULL) )]

#insert what is not in the target table
WHEN NOT MATCHED THEN
INSERT(IdPersona,_loadingDate,_lastModifiedDate ) VALUES (S.IdPersona,S.Status,[( IdProperty,Value,TIMESTAMP(NULL))],TIMESTAMP(NULL));

#insert new values when IdPersona="A"
#you will see the result won't be aggregated properly
INSERT INTO sample.DIM_PERSONA_test2(IdPersona,_lastModifiedDate)
SELECT IdPersona,TIMESTAMP(NULL) from sample.PERSONA_test2
where IdPersona = "A";

#replace the above table to recriate the ARRAY<STRUCT<>>
CREATE OR REPLACE TABLE sample.DIM_PERSONA_FINAL_test2 AS(
SELECT t1.*,t2._loadingDate,t2._lastModifiedDate 
FROM( SELECT a.IdPersona,a.Status,ARRAY_AGG(STRUCT( Properties.Id as Id,Properties.Value as Value,Properties._loadingDate,Properties._lastModifiedDate AS _lastModifiedDate)) AS Properties
FROM sample.DIM_PERSONA_test2 a,UNNEST(Properties) as Properties
GROUP BY 1,2
ORDER BY a.IdPersona)t1 LEFT JOIN sample.DIM_PERSONA_test2 t2 USING(IdPersona)
)

输出,

enter image description here

请注意,在更新 ARRAY > 时,这些值包含在 [()] 中。最后,请注意,有两个 IdPersona =“ A” ,因为需要 _loadingDate ,所以不能为 NULL ,并且由于 CURRENT_TIMESTAMP(),此字段有两个不同的值。因此,有两个不同的记录。