AWS Glue 作业 - 将镶木地板文件从 S3 加载到 RDS jsonb 列

问题描述

我在 S3 中有一个镶木地板文件,它有几列,其中之一是 json。我在 RDS 数据库中有与 jsonb 相同的一列格式。

我想将 parquet 文件复制到 RDS,但如何将文件转换为 jsonb 数据类型,因为 glue 不支持 json 列类型。当我尝试将列作为字符串插入时,出现错误。关于如何将 json 列输入到 RDS jsonb 列的任何想法?

 An error occurred while calling o145.pyWriteDynamicFrame. ERROR: column "json_column" is of type jsonb but expression is of type character varyin
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import glueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,['JOB_NAME'])

sc = SparkContext()
glueContext = glueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "s3",format = "parquet",connection_options = {"paths": ["s3://folder"],"recurse":True},transformation_ctx = "DataSource0")
Transform0 = ApplyMapping.apply(frame = DataSource0,mappings = [("id","long","id","long"),("name","string","name","string"),("json_column","json_column","string")],transformation_ctx = "Transform0")

DataSink0 = glueContext.write_dynamic_frame.from_catalog(frame = Transform0,database = "postgres",table_name = "table",transformation_ctx = "DataSink0")
job.commit()

解决方法

一种方法是使用 Psychopg2 连接到您的 RDS,迭代您的数据集并直接加载它。

How to insert JSONB into Postgresql with Python?