问题描述
我们正在使用 @dsl.containerop
在 kubeflow 管道中定义多个组件。
需求涉及两个步骤。
-
首先我们需要运行一个下载任务,它接受一个输入
url
并下载容器内的文件。 -
我们需要使用第一步生成的文件并运行一个python程序——这将在几秒钟的containerop中完成。
示例代码如下。
@dsl.component
def download(url: str,output_file: OutputPath(str)):
return dsl.ContainerOp(
name='Download',image='busybox:latest',command=["sh","-c"],arguments=["wget %s " % url,output_file)],)
上面提到的代码将使用
调用download_task = download(url=<URL>")
根据组件规范 https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/ - 不需要提及输出路径。
https://github.com/kubeflow/pipelines/blob/d106a6533bf4e1cbda4364560bc7526cb67d4eb2/samples/tutorials/Data%20passing%20in%20python%20components/Data%20passing%20in%20python%20components%20-%20Files.py#L69 - @func_to_container_op
- 我们可以看到一种使用 OutputPath 类型获取输出的方法。
有没有办法在 dsl.containerop
中实现这一点。我们不想使用 file_outputs
硬编码输出路径。
解决方法
您不能在 ContainerOp 中执行此操作,这是 ContainerOp 已被弃用的原因之一,请参阅 https://github.com/kubeflow/pipelines/pull/4166。
建议:
- 按照 https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/ 构建您的可重用组件 yaml。
- 如果您更喜欢为一次性组件内联组件 yaml,您可以通过
const defn = (functionName,args,body) => { const f = new Function(` return function ${ functionName } (${ args.join(',') }) { return ${body[0].name}(${ body.slice(1).join(',') }) }`); return f(body[0]); };
方法加载它,请参阅 this example pipeline。
以下是编写下载器组件的方法:
(将其另存为 Exception in thread "main" org.openqa.selenium.SessionNotCreatedException: Could not start a new session. Possible causes are invalid address of the remote server or browser start-up failure.
)然后执行 component.yaml
download_op = load_component_from_file('component.yaml')
还有一个使用 name: Download data
inputs:
- {name: Url,type: URI}
options given to the curl bprogram. See https://curl.haxx.se/docs/manpage.html'}
outputs:
- {name: Data}
implementation:
container:
image: curlimages/curl
command:
- sh
- -exc
- |
url="$0"
output_path="$1"
mkdir -p "$(dirname "$output_path")"
curl --get "$url" --output "$output_path"
- inputValue: Url
- outputPath: Data
下载数据然后对该数据进行训练的管道示例:XGBoost pipeline
附言已经存在从网络下载数据的组件:
curl