问题描述
我为我的 ETL 管道设置了 Apache Nifi,并希望使用 Apache Airflow 启动(并稍后监控)特定处理器。
我看到了在气流 DAG 中实现这一目标的两种方法:
我查看了官方的气流文档,知道如何使用 Pythonoperator 编写(基本的)DAG:
from airflow import DAG
from airflow.operators.python import Pythonoperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id='python_nifi_operator',schedule_interval=None,start_date=days_ago(2),tags=['example'],)
def generate_flow_file():
"""Generate and insert a flow file"""
# connect to Nifi
pass
# access processor
pass
# create flow file
pass
# insert flow file
pass
return 'Success-message for the log'
run_this = Pythonoperator(
task_id='generate_a_custom_flow_file',python_callable=generate_flow_file,dag=dag,)
问题是:如何使用 Python 生成流文件?我一直在寻找一个库,但我只找到了其他带有代码摘录的 stackoverflow 帖子,这些帖子对我没有帮助,我可以甚至找不到他们使用的软件包的文档。欢迎提供任何提示/完整的代码示例/链接。
解决方法
没有用于“生成”FlowFile 的 API,拥有一个也没有多大意义。
也就是说,您可以使用 GenerateFlowFile 处理器并使用 REST API 停止/启动它 - 之前有问题询问如何使用 API 执行此操作 https://nifi.apache.org/docs/nifi-docs/rest-api/index.html https://pypi.org/project/nipyapi/
或者您可以在端点上监听仅用于 Airflow 的 ListenHTTP/HandleHttpRequest,您可以在 Python 中通过向配置的端点发送空的 HTTP 请求来触发,从而生成 FlowFile