Pyspark格式日期

问题描述

我通过通过熊猫解析导入了CSV文件,然后将csv文件转换为拼花格式。

数据读取时间为 bigint ,日期读取为 String

我可以使用to_date函数(即df.withColumn('ObservationDate',to_date('ObservationDate')))来更改日期。

但是,我正在努力将“时间”列更改为HH格式,并且还在向“日期”列添加小时。我查看了spark文档,但是找不到所需的内容

enter image description here

解决方法

要使用“ format_string() ”功能将“时间”列转换为HH格式,然后使用 to_timestamp() 函数添加“截止日期”列。

Example:

df.show()
#+---------------+-------------------+
#|ObservationTime|    ObservationDate|
#+---------------+-------------------+
#|              1|2016-02-01T00:00:00|
#|             12|2016-02-01T00:00:00|
#+---------------+-------------------+

from pyspark.sql.functions import *

df.withColumn("HH",format_string("%02d",col("ObservationTime"))).\
withColumn("new_date",to_timestamp(concat(to_date(col("ObservationDate")),lit(' '),col("ObservationTime"))),"yyyy-MM-dd HH")).\
show()
#+---------------+-------------------+---+-------------------+
#|ObservationTime|    ObservationDate| HH|           new_date|
#+---------------+-------------------+---+-------------------+
#|              1|2016-02-01T00:00:00| 01|2016-02-01 01:00:00|
#|             12|2016-02-01T00:00:00| 12|2016-02-01 12:00:00|
#+---------------+-------------------+---+-------------------+
,

另一种选择-

lpad 通过将**HH设置为火花会话时区,以小时为单位转换为unix_timestamp **格式和** UTC添加>

 df.show(false)
    df.printSchema()

    /**
      * +---------------+-------------------+
      * |ObservationTime|ObservationDate    |
      * +---------------+-------------------+
      * |1              |2016-02-01T00:00:00|
      * |12             |2016-02-01T00:00:00|
      * +---------------+-------------------+
      *
      * root
      * |-- ObservationTime: integer (nullable = false)
      * |-- ObservationDate: string (nullable = true)
      */

    spark.conf.set("spark.sql.session.timeZone","UTC")
    df.withColumn("ObservationTime",lpad($"ObservationTime",2,"0"))
      .withColumn("new_ObservationDate",(unix_timestamp($"ObservationDate".cast("timestamp")).cast("long") + unix_timestamp($"ObservationTime","HH").cast("long"))
          .cast("timestamp")
      )
      .show(false)

    /**
      * +---------------+-------------------+-------------------+
      * |ObservationTime|ObservationDate    |new_ObservationDate|
      * +---------------+-------------------+-------------------+
      * |01             |2016-02-01T00:00:00|2016-02-01 01:00:00|
      * |12             |2016-02-01T00:00:00|2016-02-01 12:00:00|
      * +---------------+-------------------+-------------------+
      */
,

这是我的尝试。

import pyspark.sql.functions as f

df.show(10,False)

df.withColumn('ObservationTime',f.lpad('ObservationTime','0')) \
  .withColumn('ObservationDate',f.to_timestamp('ObservationDate')) \
  .withColumn('ObservationTimestamp',f.from_unixtime(f.unix_timestamp('ObservationDate') + f.unix_timestamp('ObservationTime','H'))) \
  .show(10,False)

+---------------+-------------------+
|ObservationTime|ObservationDate    |
+---------------+-------------------+
|1              |2016-02-01T00:00:00|
|14             |2016-02-01T00:00:00|
+---------------+-------------------+

+---------------+-------------------+--------------------+
|ObservationTime|ObservationDate    |ObservationTimestamp|
+---------------+-------------------+--------------------+
|01             |2016-02-01 00:00:00|2016-02-01 01:00:00 |
|14             |2016-02-01 00:00:00|2016-02-01 14:00:00 |
+---------------+-------------------+--------------------+

我已将unix_timestamp函数用于'H'的observationTime,但在Spark 2.x中将是'HH'