问题描述
我有一个简单的 dag (Airflow 10.10.12),我只想显示在路径上上传的当前 blob。然后我想下载并处理相同的 blob。这是dag
import datetime
import json
import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'start_date': datetime.datetime(2000,1,1),'email_on_failure': False,'email_on_retry': False,'labels': {"app": "airflow"},'in_cluster': True,'get_logs': True,'startup_timeout_seconds': 60,'is_delete_operator_pod': True,'retries': 0
}
az_config = {
'az_container': 'reliability','az_blob_path': 'test/gpgfile','wasb_conn_id': '*****'
}
with DAG("reliability_test",catchup=False,schedule_interval='0 * * * *',default_args=default_args) as dag:
# Use wasb_sensor to check for blob upload
# On blob upload do:
reliability_task1 = BashOperator(
task_id='reliability_task1',bash_command='echo "Hello World from Task 1"',dag=dag)
reliability_task2 = BashOperator(
task_id='reliability_task2',bash_command='echo "New blob found: {}"'.format(blob_name)),dag=dag)
reliability_task1 >> reliability_task2
我一直在研究其中有两个 Blob 传感器的 wasb_sensor。我找不到与这些传感器相关的任何示例。我还研究了 HTTP REST Api,但看起来有点太多了。任何帮助表示赞赏。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)