问题描述
以下是使用的 DAG 格式, 我在 DAG 中的输入文件上使用 for 循环,并使用
重命名输出文件CurrentDateTime = datetime.datetime.today().strftime("%Y%m%d%H%M%s")
#if currentDateTIme = 20210406010203
outputfile = ''
outputfile1 = ''
outputfile2 = ''
inputfiles = ['input1','input2']
API_CALL_TASK1 = {
source : inputfile1
filename : outputfile1 #20210406010513
}
API_CALL_TASK2 = {
source : inputfile2
filename : outputfile2
}
for file in inputfiles:
if file == 'input1'
outputfile1 = f'inputFileName_{CurrentDateTime}' #20210406010303
outputfile = f'inputFileName_{CurrentDateTime}' #20210406010303
if file == 'input2'
outputfile2 = f'inputFileName_{CurrentDateTime}'
outputfile = f'inputFileName_{CurrentDateTime}'
MOVE_OUTPUT_TO_BUCKET_TASK = (
filename = f{outputfile} #20210406010423
)
MOVE_OUTPUT_TO_BUCKET_TASK >> API_CALL_TASK1 >> API_CALL_TASK1
在任务中 - API_CALL_TASK1
、API_CALL_TASK2
和 MOVE_OUTPUT_TO_BUCKET_TASK
文件名中的日期时间不同,因为每个任务触发的时间不同。
我想将相同的文件名从循环传递给 MOVE_OUTPUT_TO_BUCKET_TASK
和 API_CALL_TASK1
或 API_CALL_TASK2
解决方法
据我所知,您希望为循环中的两个输出以 YYYYMMDD
格式传递相同的日期时间。
Airflow 为我们提供了一组默认变量,可用于所有模板。因此,您可以使用它在整个模板中传递一个常量值。在您的情况下,我看到您希望执行日期作为输出的后缀。因此,您应该使用 ds_nodash
,根据 documentation,它检索 the execution date as YYYYMMDD
。
如果您使用带有 **kwargs 的 Python 运算符,您可以使用 kwargs['ds_nodash']
访问它。