如何使用 Kedro 在云上读/写/同步数据

问题描述

简而言之:如何在本地和云端保存文件,同样如何设置为从本地读取。

更长的描述:有两种场景,1)构建模型 2)通过 API 服务模型。在构建模型时,进行一系列分析以生成特征和模型。结果将写入本地。最后,所有内容都将上传到 S3。为了提供数据,首先将下载第一步生成的所有必需文件

我很好奇我在这里如何利用 Kedro。也许我可以为每个文件 conf/base/catalog.yml 定义两个条目,一个对应于本地版本,第二个对应于 S3。但当我处理 20 个文件时,这可能不是最有效的方法

或者,我可以使用自己的脚本将文件上传到 S3 并从 Kedro 中排除同步!换句话说,Kedro 对云上存在副本这一事实视而不见。也许这种方法不是对 Kedro 最友好的方法

解决方法

不太一样,但我的回答 here 可能有用。

我建议在您的情况下,最简单的方法确实是定义两个目录条目并将 Kedro 保存到它们两个(并从本地加载以进一步加快速度),这为您提供了最大的灵活性,尽管我承认不是最漂亮的。

为了避免您的所有节点函数都需要返回两个值,我建议将装饰器应用于您使用特定标签标记的特定节点,例如 <template> <span><div>{{ $t("message.hello") }}</div></span> </template> 从以下脚本中汲取灵感(从我的一个同事):

tags=["s3_replica"]

上面是一个钩子,所以你可以把它放在你项目中的 class S3DataReplicationHook: """ Hook to replicate the output of any node tagged with `s3_replica` to S3. E.g. if a node is defined as: node( func=myfunction,inputs=['ds1','ds2'],outputs=['ds3','ds4'],tags=['tag1','s3_replica'] ) Then the hook will expect to see `ds3.s3` and `ds4.s3` in the catalog. """ @hook_impl def before_node_run( self,node: Node,catalog: DataCatalog,inputs: Dict[str,Any],is_async: bool,run_id: str,) -> None: if "s3_replica" in node.tags: node.func = _duplicate_outputs(node.func) node.outputs = _add_local_s3_outputs(node.outputs) def _duplicate_outputs(func: Callable) -> Callable: def wrapped(*args,**kwargs): outputs = func(*args,**kwargs) return (outputs,) + (outputs,) return wrapped def _add_local_s3_outputs(outputs: List[str]) -> List[str]: return outputs + [f'{o}.s3' for o in outputs] 文件(或你想要的任何地方),然后将它导入你的 hooks.py 文件并放置:

settings.py

在您的 from .hooks import ProjectHooks,S3DataReplicationHook hooks = (ProjectHooks(),S3DataReplicatonHook()) 中。

您可以更巧妙地处理输出命名约定,使其仅复制某些输出(例如,您可能同意所有以 settings.py 结尾的目录条目也必须具有相应的 .local条目,然后相应地改变该钩子中 .s3outputs,而不是对每个输出都这样做。

如果您想更聪明,可以使用 node 钩子将相应的 S3 条目注入目录中,而不是再次按照命名将数据集的 S3 版本手动写入目录中您选择的约定。虽然我认为从长远来看,编写 S3 条目更具可读性。

,

我能想到两种方法。一种更简单的方法是对云和本地都使用 --env conf。 https://kedro.readthedocs.io/en/latest/04_kedro_project_setup/02_configuration.html#additional-configuration-environments

conf
├── base
│   └── 
├── cloud
│   └── catalog.yml
└── my_local_env
    └── catalog.yml

您可以根据要使用的环境调用 kedro run --env=cloudkedro run --env=my_local

另一种更高级的方法是使用 TemplatedConfigLoader https://kedro.readthedocs.io/en/stable/kedro.config.TemplatedConfigLoader.html

conf
├── base
│   └── catalog.yml
├── cloud
│   └── globals.yml (contains `base_path:s3-prefix-path`)
└── my_local
    └── globals.yml (contains `base_path:my_local_path`)

catalog.yml中,你可以这样引用base_path

my_dataset:
    filepath: s3:${base_path}/my_dataset

您可以根据要使用的环境调用 kedro run --env=cloudkedro run --env=my_local