从 Prefect 中的 DockerRun 访问 Google 凭据密钥

问题描述

我已经开始与级长合作,我正在尝试将我的结果保存到 Google 云存储:

import prefect
from prefect.engine.results import GCSResult
from prefect.run_configs import DockerRun,LocalRun
from prefect.storage import Docker,Local

@prefect.task(checkpoint=True,result=GCSResult(bucket="redacted"))
def task1():
    return 1


storage = Local(...)
run_config = LocalRun()

with prefect.Flow(
    "myflow",storage=storage,run_config=run_config
) as flow:
    results = task1()

flow.run()

如果我将 GOOGLE_APPLICATION_CREDENTIALS 环境变量设置为密钥,则一切正常。

但是,在尝试对我的流程进行 dockerize 时,我遇到了一些困难:

storage = Docker(...)
run_config = DockerRun(dockerfile="DockerFile")

with prefect.Flow(
    "myflow",run_config=run_config
) as flow:
    ... # Same deFinition as prevIoUsly

flow.register()

在这种情况下,当尝试使用 docker 代理运行我的流程时(无论是在注册流程的同一台机器上还是其他机器上,我都会收到此错误):

google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials.
Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. 
For more @R_519_4045@ion,please see https://cloud.google.com/docs/authentication/getting-started

the documentation 之后,我尝试在我的 Prefect 云上设置 GCP_CREDENTIALS 机密。但无济于事,我仍然遇到相同的错误

我也尝试将结果保存在单独的 GCSUpload 任务中,但我仍然遇到相同的错误

我看到的一个解决方案是通过 DockerFile 在我的 docker 映像中打包凭据,但是我觉得这应该是我应该使用 Prefect 机密的用例。

解决方法

我已经找到了使用 PrefectSecret task 检索凭据的方法。

我必须创建一个额外的 GCSUpload 任务,该任务将 task1 的结果直接保存在 GCS 中。

我的最终代码如下所示:


import prefect
from prefect.tasks.gcp.storage import GCSUpload
from prefect.tasks.secrets import PrefectSecret
from prefect.run_configs import DockerRun
from prefect.storage import Docker

retrieve_gcp_credentials = PrefectSecret("GCP_CREDENTIALS")


@prefect.task(checkpoint=True,result=GCSResult(bucket="redacted"))
def task1():
    return "1"

save_results_to_gcp = GCSUpload(bucket="redacted")

storage = Docker()
run_config = DockerRun()

with prefect.Flow(
    "myflow",storage=storage,run_config=run_config
) as flow:
    credentials = retrieve_gcp_credentials()
    results = task1()
    save_results_to_gcp(results,credentials=credentials)

flow.run()

(请注意,我还必须更改 task1 返回值的类型,因为任务只能上传字符串或字节)

这对于我的用例来说已经足够好了(只需将结果保留在 GCS 中),但如果有人知道如何使用 GCSResult,我会留下这个问题,因为它对缓存也很有用。