问题描述
我通过通过熊猫解析导入了CSV文件,然后将csv文件转换为拼花格式。
数据读取时间为 bigint ,日期读取为 String
我可以使用to_date函数(即df.withColumn('ObservationDate',to_date('ObservationDate'))
)来更改日期。
但是,我正在努力将“时间”列更改为HH格式,并且还在向“日期”列添加小时。我查看了spark文档,但是找不到所需的内容。
解决方法
要使用“ format_string()
”功能将“时间”列转换为HH格式,然后使用 to_timestamp()
函数添加“截止日期”列。
Example:
df.show()
#+---------------+-------------------+
#|ObservationTime| ObservationDate|
#+---------------+-------------------+
#| 1|2016-02-01T00:00:00|
#| 12|2016-02-01T00:00:00|
#+---------------+-------------------+
from pyspark.sql.functions import *
df.withColumn("HH",format_string("%02d",col("ObservationTime"))).\
withColumn("new_date",to_timestamp(concat(to_date(col("ObservationDate")),lit(' '),col("ObservationTime"))),"yyyy-MM-dd HH")).\
show()
#+---------------+-------------------+---+-------------------+
#|ObservationTime| ObservationDate| HH| new_date|
#+---------------+-------------------+---+-------------------+
#| 1|2016-02-01T00:00:00| 01|2016-02-01 01:00:00|
#| 12|2016-02-01T00:00:00| 12|2016-02-01 12:00:00|
#+---------------+-------------------+---+-------------------+
,
另一种选择-
lpad
通过将**HH
设置为火花会话时区,以小时为单位转换为unix_timestamp
**格式和** UTC
添加>
df.show(false)
df.printSchema()
/**
* +---------------+-------------------+
* |ObservationTime|ObservationDate |
* +---------------+-------------------+
* |1 |2016-02-01T00:00:00|
* |12 |2016-02-01T00:00:00|
* +---------------+-------------------+
*
* root
* |-- ObservationTime: integer (nullable = false)
* |-- ObservationDate: string (nullable = true)
*/
spark.conf.set("spark.sql.session.timeZone","UTC")
df.withColumn("ObservationTime",lpad($"ObservationTime",2,"0"))
.withColumn("new_ObservationDate",(unix_timestamp($"ObservationDate".cast("timestamp")).cast("long") + unix_timestamp($"ObservationTime","HH").cast("long"))
.cast("timestamp")
)
.show(false)
/**
* +---------------+-------------------+-------------------+
* |ObservationTime|ObservationDate |new_ObservationDate|
* +---------------+-------------------+-------------------+
* |01 |2016-02-01T00:00:00|2016-02-01 01:00:00|
* |12 |2016-02-01T00:00:00|2016-02-01 12:00:00|
* +---------------+-------------------+-------------------+
*/
,
这是我的尝试。
import pyspark.sql.functions as f
df.show(10,False)
df.withColumn('ObservationTime',f.lpad('ObservationTime','0')) \
.withColumn('ObservationDate',f.to_timestamp('ObservationDate')) \
.withColumn('ObservationTimestamp',f.from_unixtime(f.unix_timestamp('ObservationDate') + f.unix_timestamp('ObservationTime','H'))) \
.show(10,False)
+---------------+-------------------+
|ObservationTime|ObservationDate |
+---------------+-------------------+
|1 |2016-02-01T00:00:00|
|14 |2016-02-01T00:00:00|
+---------------+-------------------+
+---------------+-------------------+--------------------+
|ObservationTime|ObservationDate |ObservationTimestamp|
+---------------+-------------------+--------------------+
|01 |2016-02-01 00:00:00|2016-02-01 01:00:00 |
|14 |2016-02-01 00:00:00|2016-02-01 14:00:00 |
+---------------+-------------------+--------------------+
我已将unix_timestamp
函数用于'H'
的observationTime,但在Spark 2.x中将是'HH'
。