airflow 待测试

#coding=utf-8
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.python_operator import Pythonoperator
import airflow.utils


# 定义认参数
default_args = {
    owner: airflow,# 拥有者名称
        start_date: airflow.utils.dates.days_ago(1),#  第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期
    email: [[email protected]],# 接收通知的email列表
    email_on_failure: True,# 是否在任务执行失败时接收邮件
    email_on_retry: True,# 是否在任务重试时接收邮件
    retries: 3,# 失败重试次数
    retry_delay: timedelta(seconds=5)  # 失败重试间隔
}

# 定义DAG
dag = DAG(
    dag_id=hello_world_args,# dag_id
    default_args=default_args,# 指定认参数
    # schedule_interval="00,*,*"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行
#     schedule_interval=timedelta(minutes=1)  # 执行周期,表示每分钟执行一次
)


# 定义要执行的Python函数1
def hello_world_args_1(**context):
    current_time = str(datetime.today())
    with open(/tmp/hello_world_args_1.txt,a) as f:
        f.write(%s\n % current_time)
    assert 1 == 1  # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常
    context[task_instance].xcom_push(key=sea1,value="seaseseseaaa")
    return "t2"

# 定义要执行的Python函数2
def hello_world_args_2(**context):
    sea = context[task_instance].xcom_pull(key="sea1")
    current_time = str(datetime.today())
    with open(/tmp/hello_world_args_2.txt,a) as f:
        f.write(%s\n % current_time+sea)

# 定义要执行的task 1
t1 = Pythonoperator(
    task_id=hello_world_args_1,# task_id
    python_callable=hello_world_args_1,# 指定要执行的函数
    dag=dag,# 指定归属的dag
    retries=2,# 重写失败重试次数,如果不写,则认使用dag类中指定的default_args中的设置
    provide_context=True,)

# 定义要执行的task 2
t2 = Pythonoperator(
    task_id=hello_world_args_2,# task_id
    python_callable=hello_world_args_2,# 指定归属的dag
    provide_context=True,)


t1>>t2

    

相关文章

功能概要:(目前已实现功能)公共展示部分:1.网站首页展示...
大体上把Python中的数据类型分为如下几类: Number(数字) ...
开发之前第一步,就是构造整个的项目结构。这就好比作一幅画...
源码编译方式安装Apache首先下载Apache源码压缩包,地址为ht...
前面说完了此项目的创建及数据模型设计的过程。如果未看过,...
python中常用的写爬虫的库有urllib2、requests,对于大多数比...