AWS Glue ETL Spark- 字符串到时间戳

问题描述

我正在尝试通过 AWS glue ETL 作业将我的 CSV 转换为 Parquet。同时,我愿意将我的日期时间列(字符串)转换为 Athena 可以识别的时间戳格式。 (雅典娜认出了这个 yyyy-MM-dd HH:mm:ss)

我浏览并应用了许多建议,但都没有成功。

您能否让我知道我应该导入哪个库,并为特定行应用脚本?以下代码是 AWS glue 建议用于从 CSV 转换为 Parquet 的代码,并且似乎也可以针对我的日期时间转换目的进行自定义

提前致谢。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import glueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv,['JOB_NAME'])

sc = SparkContext()
glueContext = glueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "partition_db",table_name = "test_folder",transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0,mappings = [("col0","string","col0","string"),("col1","col1",("col2","col2",("col3","col3",("col4","col4","string")],transformation_ctx = "applymapping1")


resolvechoice2 = ResolveChoice.apply(frame = applymapping1,choice = "make_struct",transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2,transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3,connection_type = "s3",connection_options = {"path": "s3://commercialanalytics/future_partition/test_folder_parquet"},format = "parquet",transformation_ctx = "datasink4")
job.commit()

解决方法

您可以创建一个函数并在 Map 类中调用它。

import pandas as pd

def parse_date(df):
    dt = pd.to_datetime(df["col_name"]).dt.strftime('%Y-%m-%d %H:%M:%S.%f') # Replace col_name with the column name
    return dt

applymapping1 = ApplyMapping.apply(frame = datasource0,mappings = [("col0","string","col0","string"),("col1","col1",("col2","col2",("col3","col3",("col4","col4","string")],transformation_ctx = "applymapping1")
custommapping1 = Map.apply(frame = applymapping1,f = parse_date,transformation_ctx = "custommapping1")

另一种选择是转换为 Spark Dataframes 并使用 spark.sql(....) 查询

,

使用火花数据帧。我发现这是最简单的

df= datasource0.toDF()

from pyspark.sql.functions import from_unixtime,unix_timestamp,col
df= df.withColumn(col(columnname),from_unixtime(unix_timestamp(col(columnname),"dd/MM/yyyy hh.mm")))

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...