Delta Lake库的整理过程无Databricks

问题描述

我正在使用

  • AWS DMS从Oracle提取数据
  • 它降落到S3生料桶中
  • 我想使用AWS glue编写pyspark代码,而无需使用databricks产品将CDC数据与初始负载合并。

为了创建Delta表,我需要在spark上下文中专门导入哪些库?

我在“安全性配置,脚本库和作业参数(可选)”下的glue Dependent Path中添加了delta-core_2.12-0.7.0.jar。 我在错误下面得到错误--------

文件“ script_2020-11-08-19-29-39.py”,第54行,在fullload_str_metrics_df = spark.read.parquet('s3://rawbucket/.../fullload/.../STR_METRICS /LOAD00000001.parquet')文件“ /mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip/pyspark/sql/readwriter.py”,第291行,位于拼花文件“ / mnt / yarn / usercache / root / appcache / application_1604863378634_0002 / container_1604863378634_0002_01_000001 / py4j-0.10.4-src.zip / py4j / java_gateway.py”,行1133,在调用中 deco文件中的第63行“ /mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip/pyspark/sql/utils.py” 文件“ /mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py”第319行,位于get_return_value py4j.protocol.Py4JJavaError:调用时发生错误o74.parquet。

解决方法

¿您是否尝试过Glue BookmarksGlue Crawlers

书签可以监视S3目录并仅处理新文件。它具有提交功能,可在过程成功完成后提交新的“偏移”,并具有回滚功能,可返回到偏移的先前状态(例如,在执行ID为“作业执行ID>”之前返回。)。 >

您可以从其配置中为AWS Glue作业启用书签,一旦启用,在代码中,您就可以为传递了Transformation_ctx的特定源启用书签:

import sys
    
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv,['JOB_NAME'])
glue_context = GlueContext(SparkContext.getOrCreate())
job = Job(glueContext)
job.init(args['JOB_NAME'],args)

data_frame = glue_context.create_dynamic_frame.from_catalog(database = '<database>',table_name = '<table_name>',transformation_ctx = '<source_unique_id_for_this_job>').toDF()

some transformations...

sink = DynamicFrame.fromDF(data_frame,glue_context,"<df_name>")
    
glue_context.write_dynamic_frame.from_options(frame = sink,connection_type = "s3",connection_options = {"path": "<s3_path>"},format = "parquet")

job.commit()

在此示例中,我们使用AWS Glue爬网程序将S3输入目录解释为表。

我知道这远非实时处理,但每个批处理执行仅处理新数据。