通过Airflow将文件从GCS复制到Google云端硬盘

问题描述

我想实例化一个任务(通过气流),该任务会将Google云存储中存储桶中的文件复制到驱动器中。

我使用位于的专用运算符:

from airflow.contrib.operators.gcs_to_gdrive_operator import GcsToGDriveOperator

然后操作员:

copy_files = GcsToGDriveOperator(
        task_id="copy_files",source_bucket=GCS_BUCKET_ID,source_object='{}/{}/forecasted/*'.format(COUNTRY_TRIGRAM,PRED_START_RANGE),destination_object="content/drive/Shared Drives/FORECAST_TEST",gcp_conn_id="airflow_service_account_conn_w_drive"
    )

任务成功完成,但是不要将文件复制到“目标对象”中,这是我不确定要放入什么内容的部分。

解决方法

查看Airflow GcsToGDriveOperator源代码,我假设Airflow利用gcs_hook.download()方法从GCS下载文件,并gdrive_hook.upload_file()将这些对象上传到目标Gdrive位置。

鉴于以上所述,gcs_hook.download()方法记录了成功操作结果的每个动作:

self.log.info('File downloaded to %s',filename)

类似地,gdrive_hook.upload_file()将每个文件上载的迭代记录为一条日志消息:

self.log.info("File %s uploaded to gdrive://%s.",local_location,remote_location)

即使任务成功完成,我相信您也可以在特定任务内的气流logs中捕获上述事件,查找从GcsToGDriveOperator()定义派生的实际源位置和目标位置路径。 / p>

您甚至可以考虑连接到GKE集群并启动kubectl命令行工具的Airflow工作人员日志检查:

kubectl logs deployment/airflow-worker -n $(kubectl get ns| grep composer*| awk '{print $1}') -c airflow-worker | grep 'Executing copy'

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...