问题描述
来源和灵感在这里: Clean your warehouse of old and deprecated models
尝试转换以下 dbt 宏来做两件事:
- 在 bigquery 上运行
- 从我的多个目标模式中删除“旧”关系
SELECT schema_name FROM `my-project.region-us.informatION_SCHEMA.SCHEMATA` order by schema_name desc;
schema_name
dbt_dev
dbt_dev_stage
dbt_dev_mart
dbt_dev_analytics
dbt_prod
dbt_prod_stage
dbt_prod_mart
dbt_prod_analytics
etc...
我的“调整后”宏是这样的:
{% macro drop_old_relations(dryrun=False) %}
{% if execute %}
{% set current_models=[] %}
{% for node in graph.nodes.values()
| selectattr("resource_type","in",["model","seed","snapshot"])%}
{% do current_models.append(node.name) %}
{% endfor %}
{% endif %}
{% set cleanup_query %}
WITH MODELS_TO_DROP AS (
SELECT
CASE
WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
END AS RELATION_TYPE,CONCAT( TABLE_CATALOG,".",{{ target.schema }},TABLE_NAME) AS RELATION_NAME
FROM
{{ target.database }}.{{ target.schema }}.informatION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = {{ target.schema }}
AND TABLE_NAME NOT IN
({%- for model in current_models -%}
'{{ model.upper() }}'
{%- if not loop.last -%},{% endif %}
{%- endfor -%}))
SELECT
'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
FROM
MODELS_TO_DROP
{% endset %}
{% do log(cleanup_query,info=True) %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}
{% if drop_commands %}
{% if dryrun | as_bool == False %}
{% do log('Executing DROP commands...',True) %}
{% else %}
{% do log('Printing DROP commands...',True) %}
{% endif %}
{% for drop_command in drop_commands %}
{% do log(drop_command,True) %}
{% if dryrun | as_bool == False %}
{% do run_query(drop_command) %}
{% endif %}
{% endfor %}
{% else %}
{% do log('No relations to clean.',True) %}
{% endif %}
{%- endmacro -%}
我目前遇到了宏无法识别我的某些目标模式的问题:
dbt run-operation drop_old_relations --args "{dryrun: True}"
Encountered an error while running operation: Database Error
Unrecognized name: dbt_dev at [14:32]
或者我很乐意走类似模式参数的路线,然后像运行挂钩一样遍历模式:
dbt run-operation drop_old_relations --args "{schema: dbt_dev_mart,dryrun: True}"
on-run-start:
- "{% for schema in schemas%}drop_old_relations({{ schema }},False);{% endfor%}"
解决方法
好问题,我喜欢看到人们适应这种不同的仓库。针对你的问题,我将我的回答分成两部分。
1.让它在 BigQuery 上运行
我能够首先仅在我的目标模式上进行此操作。看起来我们只需要调整 cleanup_query
中的一些 SQL。
drop_old_relations.sql
{% macro drop_old_relations(schema=target.schema,dryrun=False) %}
{# Get the models that currently exist in dbt #}
{% if execute %}
{% set current_models=[] %}
{% for node in graph.nodes.values()
| selectattr("resource_type","in",["model","seed","snapshot"])%}
{% do current_models.append(node.name) %}
{% endfor %}
{% endif %}
{# Run a query to create the drop statements for all relations in BQ that are NOT in the dbt project #}
{% set cleanup_query %}
WITH MODELS_TO_DROP AS (
SELECT
CASE
WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
END AS RELATION_TYPE,CONCAT('{{ schema }}','.',TABLE_NAME) AS RELATION_NAME
FROM
{{ schema }}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{{ schema }}'
AND UPPER(TABLE_NAME) NOT IN
({%- for model in current_models -%}
'{{ model.upper() }}'
{%- if not loop.last -%},{% endif %}
{%- endfor -%}))
SELECT
'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
FROM
MODELS_TO_DROP
{% endset %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}
{# Execute each of the drop commands for each relation #}
{% if drop_commands %}
{% if dryrun | as_bool == False %}
{% do log('Executing DROP commands...',True) %}
{% else %}
{% do log('Printing DROP commands...',True) %}
{% endif %}
{% for drop_command in drop_commands %}
{% do log(drop_command,True) %}
{% if dryrun | as_bool == False %}
{% do run_query(drop_command) %}
{% endif %}
{% endfor %}
{% else %}
{% do log('No relations to clean.',True) %}
{% endif %}
{%- endmacro -%}
2.清理多个架构 与其调整上面的宏来清理所有架构,我认为使用另一个宏来获取所有架构然后遍历每个架构并调用上面的宏可能会有所帮助。
clean_all_schemas.sql
{% macro clean_all_schemas() %}
{% set get_schemas_query %}
SELECT schema_name FROM `{{ target.project }}.region-{{ target.location }}.INFORMATION_SCHEMA.SCHEMATA` order by schema_name desc;
{% endset %}
{% set schemas = run_query(get_schemas_query).columns[0].values() %}
{% for schema in schemas %}
{% do log("Cleaning up " + schema + " schema",True) %}
{{ drop_old_relations(schema=schema) }}
{% endfor %}
{% endmacro %}%}
命令
-
dbt run-operation drop_old_relations
- 删除目标架构中的旧关系 -
dbt run-operation clean_all_schemas
- 在目标 BQ 项目中删除跨架构的旧关系。
让我们知道这是否适用于您的用例:-)