pandas_udf给出与pyarrow相关的错误

问题描述

我有一个数据框,我想使用pysaprk中的折线库获取给定地理位置的lat_long

+-----------------+--------------------+----------+                             
|              vid|        geolocations| trip_date|
+-----------------+--------------------+----------+
|58AC21B17LU006754|eurnE||yqU???????...|2020-02-22|
|2T3EWRFV0LW060632|uocbGfjniOK[Fs@rC...|2020-02-25|
|JTDP4RCE0LJ014008|w}wtFpdxtM????Q_@...|2020-02-25|
|4T1BZ1HK8KU029845|}rz_Dp~hhN?@?@???...|2020-03-03|

我正在使用pandas_udf,并且已启用apache箭头

from pyspark.sql.functions import col,pandas_udf
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

lat_long_udf = pandas_udf(lambda geoloc:  polyline.decode(geoloc)[0],ArrayType(StringType()))
df1=df.withColumn('lat_long',lat_long_udf(df.geolocations))

调用df.count()时给出结果,但是执行df.show()时,出现如下错误:

 248,in init_stream_yield_batches
    for series in iterator:
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",line 450,in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets,f) in udfs)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets,line 110,in <lambda>
    verify_result_type(f(*a)),len(a[0])),arrow_return_type)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py",line 107,in wrapper
    return f(*args,**kwargs)
  File "<stdin>",line 1,in <lambda>
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/polyline/__init__.py",line 16,in decode
    return PolylineCodec().decode(expression,precision,geojson)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/polyline/codec.py",line 43,in decode
    lat_change,index = self._trans(expression,index)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/polyline/codec.py",line 31,in _trans
    byte = ord(value[index]) - 63
TypeError: ord() expected a character,but string of length 87 found




>>> print(pandas.__version__)
1.1.1
>>> print(numpy.__version__)
1.19.1
>>> import pyarrow
>>> print(pyarrow.__version__)
1.0.1

解决方法

您很可能会遇到此错误,因为pandas_udf将pandas系列作为输入,并且您将decode函数直接应用于该系列,而不是将其应用于pandas系列中的值

例如在下面的示例中,我稍微扩展了lambda函数,以便您可以看到它。我选择了pandas系列,将polyline.decode函数应用于该系列,然后再次返回结果系列。请注意,我还将返回类型更改为ArrayType(DoubleType()),而不是ArrayType(StringType())

import pandas as pd

from pyspark.sql.types import ArrayType,DoubleType

....


df = spark.createDataFrame([["~sqU__pR_jpv@_pR"],["_~t[__pR~qy@_pR"]],["geolocations"])


@pandas_udf(ArrayType(DoubleType()))
def lat_long_udf(s: pd.Series) -> pd.Series:
  return s.apply(lambda x: polyline.decode(x)[0])


df1=df.withColumn('decoded',lat_long_udf(df.geolocations))
df1.collect()

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...