问题描述
因此,我的问题是我在Airflow中构建ETL管道,但实际上首先要在Jupyter笔记本中开发和测试Extract,Transform和Load功能。因此,我最终总是在Airflow Python操作员代码和Jupyter笔记本之间来回复制粘贴,效率很低!我的直觉告诉我,所有这些都可以自动化。
基本上,我想在Jupyter中编写我的Extract,Transform和Load函数,并使其停留在那儿,同时仍在Airflow中运行管道,并显示出Extract,Transform和Load任务,并进行重试和所有其他功能。 Airflow提供的现成的东西。
Papermill可以对笔记本进行参数设置,但是我真的不认为这对我的情况有什么帮助。有人可以帮我把点子联系起来吗? ??
解决方法
单个主 Jupyter 笔记本,带有任意数量的从笔记本(用作模板),使用 papermill.execute_notebook
按顺序执行,应该足以自动化任何 ML 管道。
为了在管道阶段(从一个从笔记本到下一个)之间传递信息,可以使用另一个 Netflix 包 scrapbook
,它允许我们在从笔记本中记录 Python 对象(因为它们由 papermill
处理),然后从管道主机中的从属设备中检索这些对象(保存使用 scrapbook.glue
和读取 - scrapbook.read_notebook
)。
从任何已完成的阶段恢复也是可能的,但它需要将在前一阶段保存的必要输入存储在可从 master 访问的可预测位置(例如,在本地 master JSON 文件或 MLflow 中)。
也可以使用 cron 作业来安排主笔记本,例如from Kubernetes).
- 替代方案
由于管理成本(5 个容器,包括 2 个数据库),Airflow 对大多数 ML 团队来说可能是一种矫枉过正,而其他(非 Netflix)python 包要么需要更多样板(Luigi),要么需要额外的特权和自定义 docker 镜像对于执行者 (Elyra),而 Ploomber 将面临少数维护者的风险。
,[免责声明:我是上述开源项目的提交者之一。] 我们创建了 Elyra - 一组 JupyterLab 扩展 - 来简化这类工作.我们刚刚发布了 2.1 版,它提供了一个可视化编辑器,您可以使用它从笔记本和 Python 脚本(R 支持应该很快可用)组装管道,并在 Apache Airflow、Kubeflow Pipelines 或本地 JupyterLab 中运行它们。对于 Airflow(在 Kubernetes 上运行),我们创建了一个 custom operator 来负责内务管理和执行。我写了一篇关于它的摘要文章,你可以找到 here,如果你有兴趣尝试一下,我们有几个 introductory tutorials。
,可以按照您的建议通过Papermill在气流管道中使用Jupyter笔记本。但是,Airflow的优点之一是您可以将管道分成彼此独立的独立步骤,因此,如果您决定将整个管道写在一个Jupyter Notebook中,那将会失败使用气流的目的。
因此,假设您的每个离散 ETL步骤都位于单独的Jupyter Notebook中,则可以尝试以下操作:
- 为每个步骤创建一个Jupyter Notebook。例如,
copy_data_from_s3
,cleanup_data
,load_into_database
(3个步骤,每个笔记本一个)。 - 确保根据Papermill instructions对每个笔记本进行了参数设置。这意味着,在每个单元格中添加一个标记,以声明可以从外部进行参数设置的变量。
- 确保这些笔记本可由Airflow找到(例如,与DAG所在的文件夹位于同一文件夹中)
- 编写将使用Papermill参数化和运行笔记本的功能,每一步一个。例如:
import papermill as pm
# ...
# define DAG,etc.
# ...
def copy_data_from_s3(**context):
pm.execute_notebook(
"copy_data_from_s3_step.ipynb","copy_data_from_s3_step.ipynb"
parameters=dict(date=context['execution_date'])) # pass some context parameter if you need to
)
- 最后,将步骤设置为
PythonOperator
(尽管如果要运行Papermill from the command line,也可以使用BashOperator
)。要从上方匹配功能:
copy_data = PythonOperator(dag=dag,task_id='copy_data_task',provide_context=True,python_callable=copy_data_from_s3)
,
Airflow 有一个 papermill operator,但开发经验不是很好。 Airflow 中基于 Python 的 DAG 的主要问题之一是它们在同一个 Python 环境中执行,一旦您拥有多个 DAG,就会导致依赖性问题。 See this for more details。
如果您愿意尝试新工具,我建议您使用 Ploomber(免责声明:我是作者),它可以编排基于笔记本的管道(它在兜帽)。您可以在本地开发并导出到 Kubernetes 或 Airflow。
如果您想了解 Ploomber 中的项目是什么样的,请随时查看 examples repository。
,为什么您希望将 ETL 作业作为 jupyter notebook。你看到什么优势? Notebooks 通常用于构建带有实时数据的精美文档。 ETL 作业应该是在后台运行并自动化的脚本。
为什么这些作业不能是纯 python 代码而不是 notebook?
此外,当您使用 PapermillOperator 运行笔记本时,运行的输出将是另一个保存在某处的笔记本。不断检查这些输出文件不是那么友好。
我建议用普通 Python 编写 ETL 作业并使用 PythonOperator 运行它。这更简单,更易于维护。
如果你想使用笔记本的奇特功能,那就是另一回事了。