问题描述
我有一个要传递给xcom的对象,希望从操作员那里读取。
这是我的接线员:
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',bucket='test',source_objects=['{{ execution_date.strftime("%Y-%m") }}'],providers=True,destination_project_dataset_table=f'{stg_dataset_name}' + '.' + '{{ execution_date.strftime("%Y_%m") }}',schema_fields={{ti.xcom_pull(task_ids='print_the_context')}},write_disposition='WRITE_TRUNCATE',provide_context=True,dag=dag)
我想将xcom的值传递给schema_fields
变量。
我正在尝试使用以下模板{{ti.xcom_pull(task_ids='print_the_context')}}
访问对象,但是我有it is not defined
...
这是怎么了?
解决方法
不幸的是,目前无法实现
-
要想在
operator
参数中使用macros
,必须在运算符的源代码中将相应字段定义为template_fields
-
但是在
GCSToBigQueryOperator
的源代码中,我看到了schema_fields
is missing fromtemplate_fields
template_fields = ('bucket','source_objects','schema_object','destination_project_dataset_table')
因此,您无法通过XCOM模板为schema_fields
提供价值
也就是说,尽管我不知道GCSToBigQueryOperator
的内部结构,但我可以看到2种可能的解决方案
-
(直接)改为使用
schema_object
field:param schema_object: If set,a GCS object path pointing to a .json file that contains the schema for the table. (templated) Parameter must be defined if 'schema_fields' is null and autodetect is False. :type schema_object: str
-
您可以尝试对其进行子类化,并将
中schema_fields
包含在template_fields
有趣的读物