问题描述
我用从 s3 读取的数据编写了这段代码,并在 AWS glue 上写入了 s3。
这是一个自定义的 pyspark 代码,我没有使用生成的脚本。
这是脚本
from pyspark import SparkConf,SparkContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *
from awsglue.utils import getResolvedOptions
from awsglue.context import glueContext
from awsglue.job import Job
import sys
args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])
conf = SparkConf()
conf.set("spark.sql.parquet.compression.codec","snappy")
conf.set("spark.sql.parquet.writeLegacyFormat","true")
output_dir_path="s3://mygluecrawler/pysparkglueData/"
sc = SparkContext()
glueContext = glueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
input_file = "s3://mygluecrawler/pysparkglueData/store.csv"
#print(" Dropping the malformed data")
sparkDF = spark.read.format("csv").option("header","true").option("mode",'DROPMALFORMED').option("mode","FAILFAST").load(input_file)
sparkDF = sparkDF.withColumn("Competitiondistance",sparkDF.Competitiondistance.cast('float'))
sparkDF = sparkDF.withColumn("CompetitionopenSinceMonth",sparkDF.CompetitionopenSinceMonth.cast('int'))
sparkDF = sparkDF.withColumn("CompetitionopenSinceYear",sparkDF.CompetitionopenSinceYear.cast('int'))
sparkDF = sparkDF.withColumn("Promo2",sparkDF.Promo2.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceWeek",sparkDF.Promo2SinceWeek.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceYear",sparkDF.Promo2SinceYear.cast('int'))
#sparkDF = sparkDF.fillna(value=0)
#Replaces anything which is null with the value,here its replacing null with -99 in the array of two columns
#mentioned in the subset
sparkDF = sparkDF.fillna(value=-99,subset=["Promo2SinceWeek","Promo2SinceYear"])
sparkColumns = sparkDF.select('StoreType','Competitiondistance','CompetitionopenSinceYear','Promo2SinceWeek','Promo2')
sparkColumns.write.format('parquet').partitionBy(['StoreType','Promo2']).mode('append').option("path",output_dir_path).save()
上面的脚本运行良好。
现在我正在做同样的事情,但不是从 s3 读取文件并写入 s3 我想用胶水从红移中读取并在红移中写入
这里我正在从 redshift 读取数据并将其写入 redshift 请检查 脚本并告诉我这是正确的方法。我需要帮助。
from pyspark.context import SparkContext,SparkConf
from pyspark.sql import sqlContext
from pyspark.sql import functions as f
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from awsglue.context import glueContext
from awsglue.job import Job
import sys
args = getResolvedOptions(sys.argv,'JOB_NAME'])
conf = SparkConf()
sc = SparkContext()
glueContext = glueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
jdbcURL = "jdbc:redshift://my-redshift-database.cealcs9iyaaz.us-****-2.redshift.amazonaws.com:5439/dev?user=username&password=password"
#sparkDF = spark.read.format("csv").option("header","FAILFAST").load(input_file)
sparkDf = spark.read.option("url",jdbcURL) \
.option("dbtable","glue_poc.s3toredshift")\
.option("tempdir","s3://mygluecrawler/sparkLogs/")\
.load()
sparkDf.createOrReplaceTempView("people")
newdata = spark.sql("select * from people")
dynamic_df = DynamicFrame.fromDF(newdata,glueContext,"dynamic_df")
mapped_df = ResolveChoice.apply(frame = dynamic_df,choice = "make_cols",transformation_ctx = "mapped_df")
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mapped_df,catalog_connection = "redshift-new-connection",connection_options = {"dbtable" : "glue_poc.s3toredshift","database":"dev"},redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink"
)
job.commit()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)