使用 Apache Airflow 启动 Apache Nifi - 如何生成流文件

问题描述

我为我的 ETL 管道设置了 Apache Nifi,并希望使用 Apache Airflow 启动(并稍后监控)特定处理器。

我看到了在气流 DAG 中实现这一目标的两种方法

  1. 从头开始生成文件并将其插入到 Nifi 队列/处理器中
  2. 触发“generate-flow-file-processor”以创建一个文件,然后将其插入队列

我查看了官方的气流文档,知道如何使用 Pythonoperator 编写(基本的)DAG:

from airflow import DAG
from airflow.operators.python import Pythonoperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id='python_nifi_operator',schedule_interval=None,start_date=days_ago(2),tags=['example'],)

def generate_flow_file():
    """Generate and insert a flow file"""
    # connect to Nifi
    pass
    # access processor
    pass
    # create flow file
    pass 
    # insert flow file 
    pass 
    return 'Success-message for the log'

run_this = Pythonoperator(
    task_id='generate_a_custom_flow_file',python_callable=generate_flow_file,dag=dag,)

问题是:如何使用 Python 生成文件我一直在寻找一个库,但我只找到了其他带有代码摘录的 stackoverflow 帖子,这些帖子对我没有帮助,我可以甚至找不到他们使用的软件包的文档。欢迎提供任何提示/完整的代码示例/链接

解决方法

没有用于“生成”FlowFile 的 API,拥有一个也没有多大意义。

也就是说,您可以使用 GenerateFlowFile 处理器并使用 REST API 停止/启动它 - 之前有问题询问如何使用 API 执行此操作 https://nifi.apache.org/docs/nifi-docs/rest-api/index.html https://pypi.org/project/nipyapi/

或者您可以在端点上监听仅用于 Airflow 的 ListenHTTP/HandleHttpRequest,您可以在 Python 中通过向配置的端点发送空的 HTTP 请求来触发,从而生成 FlowFile