问题描述
我当前的用例是,在基于ETL的服务中(NOTE
:ETL服务未使用Glue ETL,它是一个独立的服务),我正在从AWS Redshift集群中获取一些数据到S3中。然后将S3中的数据馈送到T和L作业中。我想将元数据填充到Glue Catalog中。最基本的解决方案是使用Glue Crawler,但是该搜寻器运行大约1小时20分钟(很多s3分区)。我遇到的另一个解决方案是使用Glue API。但是,我同样面临数据类型定义的问题。
有什么办法,我可以创建/更新Glue目录表,其中S3中有数据,并且仅在提取过程中才知道数据类型。
但是,在运行T和L作业时,目录中的数据类型也应该很容易获得。
解决方法
要在ETL过程中创建,更新数据目录,可以使用以下内容:
更新:
additionalOptions = {"enableUpdateCatalog": True,"updateBehavior": "UPDATE_IN_DATABASE"}
additionalOptions["partitionKeys"] = ["partition_key0","partition_key1"]
sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform,database=<dst_db_name>,table_name=<dst_tbl_name>,transformation_ctx="write_sink",additional_options=additionalOptions)
job.commit()
以上内容可用于更新架构。您还可以选择在updateBehavior
或LOG
(default)之间选择UPDATE_IN_DATABASE
。
创建
要在ETL期间在数据目录中创建新表,可以遵循以下示例:
sink = glueContext.getSink(connection_type="s3",path="s3://path/to/data",enableUpdateCatalog=True,updateBehavior="UPDATE_IN_DATABASE",partitionKeys=["partition_key0","partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>,catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)
您可以使用setCatalogInfo
指定数据库和新表的名称。
您还可以选择使用enableUpdateCatalog
参数在数据目录中更新分区,然后指定partitionKeys
。
有关功能的更详细说明,请参见here。
,找到了解决问题的方法,最终我利用了Glue Catalog API使其无缝和快速。 我创建了一个与Glue Catalog交互的界面,并针对各种数据源覆盖了这些方法。在将数据加载到S3之后,我立即触发查询以从源中获取架构,然后接口完成其工作。