在多个s3键上执行气流任务,然后执行下一个任务

问题描述

我有一个用例,我们在s3目录中有一组10个文件(比方说)。我们正在尝试将这些文件重命名为第二个目录中对应的映射的重命名文件名模式。我通过传递文件名动态创建了任务ID唯一性。

 for file in rename_list:
    rename_tak = RenameOperator(
        task_id="file_rename_task_{}".format(str(file.split(":")[0])),s3_conn_id=s3_CONN_ID,source_s3_bucket=source_bucket,destination_s3_bucket=destination_bucket,s3_key = source_prefix + source_key,rename_key = destination_key,output_prefix = output_prefix,dag=dag))

然后,我再次需要对其执行另一操作,最后将其移至最终的s3目录。这也将在for循环中运行,因为我们有多个文件。

现在,问题是第二个操作员/任务执行没有被调用,没有错误,但是Airflow日志显示“任务处于“已删除”状态,这不是有效的执行状态。必须清除任务为了运行。”重命名运算符的第一个任务全部成功,但是第二个运算符只是被删除,没有输出,没有日志。

任何反馈,这里可能出什么问题。

解决方法

气流工作流是静态的,不建议使用动态任务ID。据我所知,动态DAG可以通过一些黑客手段来实现,但Airflow并不直接支持。

现在,问题是第二个操作员/任务执行没有被调用,没有错误,但是Airflow日志显示“任务处于“已删除”状态,这不是有效的执行状态。必须清除任务为了运行。”重命名运算符的第一个任务全部成功,但是第二个运算符只是被删除,没有输出,没有日志。

在不知道如何在任务之间添加依赖性的情况下,很难确定问题所在。

我对问题的理解:

  1. 您有一个s3存储桶,该存储桶在路径(源)中包含一些文件
  2. 您需要对每个文件进行一些操作
  3. 将完成的文件移动到新的s3路径(目标)

如果我的理解是正确的,那么我设计工作流程的方式是:

download_files_from_source >> apply_some_operation_on_downloaded_files >> move_files_to_destination

如果跨工作人员共享文件系统,这将起作用。 DAG的每次运行都应具有自己的登台目录,以使属于不同DAG运行的文件不会重叠。

否则,您还可以编写一个自定义运算符来执行上一个解决方案中的所有三个任务,即从源下载文件,对下载的文件进行一些操作,然后将文件移到目标位置。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...