如何在 Airflow 的每个循环中获得统一的当前日期时间?

问题描述

以下是使用的 DAG 格式, 我在 DAG 中的输入文件上使用 for 循环,并使用

重命名输出文件
CurrentDateTime = datetime.datetime.today().strftime("%Y%m%d%H%M%s")

#if currentDateTIme = 20210406010203

outputfile = ''
outputfile1 = ''
outputfile2 = ''
inputfiles = ['input1','input2']

API_CALL_TASK1 = {
source : inputfile1
filename : outputfile1 #20210406010513
}

API_CALL_TASK2 = {
source : inputfile2
filename : outputfile2 
}

for file in inputfiles:
    if file == 'input1'
        outputfile1 = f'inputFileName_{CurrentDateTime}' #20210406010303
        outputfile = f'inputFileName_{CurrentDateTime}' #20210406010303
    if file == 'input2'
        outputfile2 = f'inputFileName_{CurrentDateTime}' 
        outputfile = f'inputFileName_{CurrentDateTime}'        
    MOVE_OUTPUT_TO_BUCKET_TASK = (
                                 filename = f{outputfile} #20210406010423
                                 )

MOVE_OUTPUT_TO_BUCKET_TASK >> API_CALL_TASK1 >> API_CALL_TASK1

在任务中 - API_CALL_TASK1API_CALL_TASK2MOVE_OUTPUT_TO_BUCKET_TASK 文件名中的日期时间不同,因为每个任务触发的时间不同。

如何获取每个文件的统一日期时间?

我想将相同的文件名从循环传递给 MOVE_OUTPUT_TO_BUCKET_TASKAPI_CALL_TASK1API_CALL_TASK2

解决方法

据我所知,您希望为循环中的两个输出以 YYYYMMDD 格式传递相同的日期时间。

Airflow 为我们提供了一组默认变量,可用于所有模板。因此,您可以使用它在整个模板中传递一个常量值。在您的情况下,我看到您希望执行日期作为输出的后缀。因此,您应该使用 ds_nodash,根据 documentation,它检索 the execution date as YYYYMMDD

如果您使用带有 **kwargs 的 Python 运算符,您可以使用 kwargs['ds_nodash'] 访问它。