问题描述
我有一个 .csv 文件,我正在尝试使用 Python 使用 Azure Databricks 对其进行修改。
该文件有两列timeOfMeasurement
(格式为2021-01-04T07:07:45.098+0000
)和eventProcessedUtcTime
(格式为2021-01-04T07:07:45.6768856Z
)。我想用这些格式化为日期时间的字符串添加新列。
当我使用单个变量时,以下格式有效:
import datetime
# EventProcessedUtcTime
a = '2021-01-04T07:07:45.6768856Z'
x = datetime.datetime.strptime((a[:26]).strip(),'%Y-%m-%dT%H:%M:%s.%f')
print(x)
print(type(x))
# timeOfMeasurement
b = '2021-01-04T07:07:45.098+0000'
y = datetime.datetime.strptime((b[:23]).strip(),'%Y-%m-%dT%H:%M:%s.%f')
#y = (b[:23]).strip()
print(y)
print(type(y))
并创建以下输出:
2021-01-04 07:07:45.676885
<class 'datetime.datetime'>
2021-01-04 07:07:45.098000
<class 'datetime.datetime'>
但是当我试图为整个数据框实现这一点时
import datetime
df = spark.read.format('csv').options(header='true',inferSchema='true').load('/mnt/data/values.csv')
df_adjusted = (df.withColumn("timeOfMeasurementDatetime",datetime.datetime.strptime((df.timeOfMeasurement[:23]).strip(),'%Y-%m-%dT%H:%M:%s.%f'))
我收到一条错误消息:
TypeError: startPos 和 length 必须是同一类型。分别得到
我不知道问题出在哪里,因为我对整个主题完全陌生,如果有人能帮助我,我会非常高兴。
解决方法
为此您需要使用 PySpark 函数,而不是使用 Python 的内置函数。您有以下可能性:
- 如果您的字符串采用 ISO8601 时间格式,那么您只需在相应的列上执行
.cast('timestamp')
:
>>> df = spark.createDataFrame([('2021-01-04T07:07:45.098+0000','2021-01-04T07:07:45.6768856Z')],['ts1','ts2'])
>>> df.show(truncate=False)
+----------------------------+----------------------------+
|ts1 |ts2 |
+----------------------------+----------------------------+
|2021-01-04T07:07:45.098+0000|2021-01-04T07:07:45.6768856Z|
+----------------------------+----------------------------+
>>> df.select(df.ts1.cast('timestamp'),df.ts2.cast('timestamp')).show(truncate=False)
+-----------------------+--------------------------+
|ts1 |ts2 |
+-----------------------+--------------------------+
|2021-01-04 08:07:45.098|2021-01-04 08:07:45.676885|
+-----------------------+--------------------------+
- 如果您的字符串有一些自定义格式,您可以使用内置函数,例如 to_timestamp,并提供格式字符串。