Airflow:在 Azure Blob 上传上触发 dag

问题描述

我有一个简单的 dag (Airflow 10.10.12),我只想显示在路径上上传的当前 blob。然后我想下载并处理相同的 blob。这是dag

import datetime
import json
import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'start_date': datetime.datetime(2000,1,1),'email_on_failure': False,'email_on_retry': False,'labels': {"app": "airflow"},'in_cluster': True,'get_logs': True,'startup_timeout_seconds': 60,'is_delete_operator_pod': True,'retries': 0
}

az_config = {
    'az_container': 'reliability','az_blob_path': 'test/gpgfile','wasb_conn_id': '*****'
}

with DAG("reliability_test",catchup=False,schedule_interval='0 * * * *',default_args=default_args) as dag:

    # Use wasb_sensor to check for blob upload
    # On blob upload do: 
    reliability_task1 = BashOperator(
        task_id='reliability_task1',bash_command='echo "Hello World from Task 1"',dag=dag)

    reliability_task2 = BashOperator(
        task_id='reliability_task2',bash_command='echo "New blob found: {}"'.format(blob_name)),dag=dag)
    
    reliability_task1 >> reliability_task2

我一直在研究其中有两个 Blob 传感器的 wasb_sensor。我找不到与这些传感器相关的任何示例。我还研究了 HTTP REST Api,但看起来有点太多了。任何帮助表示赞赏。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)