问题描述
我试图从我创建的目录文件中获取数据集列表,并将其作为单个节点的输入传递,以将它们组合起来,并最终使用kedro-airflow插件在气流上运行管道
这可以在kedro运行的cli上运行,但是似乎无法通过气流,我不确定为什么:
#my_pipeline/pipeline.py
def create_pipeline(**kwargs):
conf_loader = ConfigLoader(['conf/base'])
conf_catalog = conf_loader.get('catalog-a*')
datasets = [key for key,value in conf_catalog.items()]
return Pipeline([
node(
func=combine_data,inputs=datasets,outputs="combined_data",name="combined_data"
),...#other nodes
])
我遇到的气流错误看起来像这样: 断线:给定的配置路径都不存在 或不是有效目录:“ conf / base”
这肯定是一个Kedro配置加载器错误,但是我似乎无法弄清楚为什么通过气流运行管道时唯一的错误发生了。根据我一直在阅读的内容,建议不要在代码API中混用。这是传递数据集列表的正确方法吗?
编辑
我的目录基本上是SQL查询数据集的列表:
dataset_1:
type: pandas.sqlQueryDataSet
sql: select * from my_table where created_at >= '2018-12-21 16:00:00' and partner_id=1
credentials: staging_sql
dataset_2:
type: pandas.sqlQueryDataSet
sql: select * from my_table where created_at >= '2019-08-15 11:55:00' and partner_id=2
credentials: staging_sql
解决方法
我认为这可能会失败,因为kedro run从其根目录运行该目录,该目录可以找到conf / base,但是create_pipeline函数位于my_pipeline
目录下,因此kedro ConfigLoader无法找到该目录。
我认为过去我做过的另一种方法是像这样传递catalog: DataCatalog
:
def create_pipeline(catalog: DataCatalog = None,* *kwargs) -> Pipeline:
然后您可以迭代或执行以下操作:
datasets = catalog.datasets
。