问题描述
以前的问题
此问题已在 here、here 和 here 之前报告过,但是,我怀疑这可能是因为调用了 google 云存储。
前提/问题
以下代码位于 Google Cloud Composer 实例的 DAG 文件夹中。
以下代码块基于字符串列表“动态”生成 DAG,并且将成功运行,生成名称为“new_dummy_ohhel
”和“{{”的 2 个 DAG 1}}”。这些 DAG 可以访问并且可以使用。
new_dummy_hello
现在,有问题且奇怪的是,当对 Google Cloud Storage 的简单调用替换字符串列表时,仍然会创建 DAG,但在尝试访问它们时会产生错误。如果将变量 import datetime as dt
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from google.cloud.storage import Client as Client_Storage
list_strings = ["ohhello","hellothere"]
# ^ Variable of importance
for string_val in list_strings:
name_dag = f"new_dummy_{string_val[:5]}"
# Just get the first 5 characters (this is important later)
dag = DAG(
name_dag,default_args={
"owner": "airflow","start_date": dt.datetime.Now() - dt.timedelta(days=1),# ^ yesterday
"depends_on_past": False,"email": [""],"email_on_failure": False,"email_on_retry": False,"retries": 1,"retry_delay": dt.timedelta(minutes=5),},schedule_interval=None,)
with dag:
op_dummy_start = DummyOperator(task_id="new-dummy-task-start")
op_dummy_end = DummyOperator(task_id="new-dummy-task-end")
op_dummy_start >> op_dummy_end
globals()[name_dag] = dag
替换为以下内容,则会发生此错误。
list_strings
假设我在存储桶“some_bucket_name”中有文件“something.json”和“omgwhy.json”,那么将创建两个不可访问且不可执行的 DAG:“list_strings = [
x.name
for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
”和“{{1” }}”。 (仅获得前五 (5) 个字母,因此不包括 new_dummy_somet
。)这表明对存储的调用成功,但 DAG 仍然“似乎丢失”。
即使代码立即像下面这样覆盖了该列表,DAG“似乎丢失”的错误仍然会出现(注意 DAG 将是“new_dummy_omgwh
”和“.
” ):
new_dummy_ohhel
tl;博士与假设
当对 Google Cloud Storage 进行任何调用(包括成功和未使用的调用)时,文件中创建的所有 DAG 都会成功显示,但会表明 new_dummy_hello
>成功调用会导致此问题。
尝试过的解决方案
- 尝试重新启动整个 Google Cloud Composer 实例
- 尝试通过添加环境变量重新启动 Airflow 网络服务器
解决方法
Cloud Composer 中的托管 Web 服务器在与您环境的主要工作机器(使用在环境创建期间指定的服务帐户)不同的服务帐户下运行。这意味着,如果您打算从环境主存储桶以外的存储桶中读取数据,则需要使用类似的 ACL,否则 Web 服务器将无法从存储桶中读取数据。
在您的情况下,Airflow 调度程序可能会读取存储桶(使用环境的服务帐户),但 Web 服务器不能。调度程序将为 DAG 创建一个条目,但如果 Web 服务器无法在没有遇到异常的情况下解析定义文件,您将收到“DAG x is missing”。
如果您觉得上述情况正确,您可以通过调整 Cloud Storage 存储分区 ACL 或启用 DAG serialization 来解决此问题。序列化消除了 Web 服务器解析/执行定义文件的需要,并将其全部交给调度程序,因此它也可以解决您的问题。