PySpark-使用本机Spark函数将长时长毫秒转换为TimestampType

问题描述

我正在使用PySpark库读取JSON文件,处理数据并写回实木复合地板文件。

传入数据的日期字段以纪元为单位,以毫秒为单位。例如,1541106106796代表:Thursday,November 1,2018 9:01:46.796 PM

有效的解决方案使用Python datetime库:

def format_datetime(ts):
    return datetime.fromtimestamp(ts/1000.0)

...
get_timestamp = udf(lambda x: format_datetime(int(x)),TimestampType())
df = df.withColumn("timestamp",get_timestamp(df.ts))

是否存在仅使用本机Spark函数的解决方案?

解决方法

使用 from_unixtime 并从时间戳中提取毫秒,然后在末尾添加,最后转换为 timestamp 类型

df.show()
#+-------------+
#|           ts|
#+-------------+
#|1541106106796|
#+-------------+

df.withColumn("ts1",expr('concat_ws(".",from_unixtime(substring(ts,1,length(ts)-3),"yyyy-MM-dd HH:mm:ss"),substring(ts,length(ts)-2,length(ts)))').cast("timestamp")).\
show(10,False)
#+-------------+-----------------------+
#|ts           |ts1                    |
#+-------------+-----------------------+
#|1541106106796|2018-11-01 16:01:46.796|
#+-------------+-----------------------+

要创建 unixtime ,请使用unix_timestampregexp_extract函数。

Example:

df.show(10,False)
#+-----------------------------------------+
#|sample                                   |
#+-----------------------------------------+
#|Thursday,November 1,2018 9:01:46.796 PM|
#+-----------------------------------------+

df.withColumn("ts",concat_ws('',unix_timestamp(col("sample"),"E,MMMM d,yyyy hh:mm:ss.SSS a"),regexp_extract(col("sample"),"\\.(.*)\\s+",1))).\
show(10,False)
#+-----------------------------------------+-------------+
#|sample                                   |ts           |
#+-----------------------------------------+-------------+
#|Thursday,2018 9:01:46.796 PM|1541124106796|
#+-----------------------------------------+-------------+

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...