在 PySpark 中将 RDD 转换为 DataFrame

问题描述

我无法在 pyspark 中将 RDD 数据转换为 Dataframe。

这是我写的代码

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType,Row
from pyspark.sql import *
spark = SparkSession \
        .builder \
        .appName("pyspark") \
        .master("local[3]") \
        .getorCreate()  
empdata = spark.sparkContext.textFile("/FileStore/tables/empdatarevised.txt").map(lambda x: x.split(","))        
schema = StructType([
        StructField("eid",IntegerType(),True),StructField("ename",StringType(),StructField("edept",StructField("esal",StructField("revsal",DoubleType(),])
df = spark.createDataFrame(data=empdata,schema=schema)
df.show()

我收到错误

org.apache.spark.SparkException:作业因阶段失败而中止:阶段 5.0 中的任务 0 失败 1 次,最近失败:阶段 5.0 中丢失任务 0.0 (TID 7) (ip-10-172-239- 64.us-west-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'TypeError: field eid: IntegerType can not accept object '100' in type ' .完整回溯如下:

我知道这可以通过 spark.read.format("csv").load("file.txt") 来完成,但我的意图是使用 StructType 将 RDD 转换为 Dataframe。

寻求您的帮助。

提前致谢。

解决方法

当您从 RDD 创建数据帧时,Spark 无法将字符串转换为整数/双精度数。您可以显式更改 RDD 中条目的类型,例如

empdata = (sc.textFile("/FileStore/tables/empdatarevised.txt")
             .map(lambda x: x.split(","))
             .map(lambda x: [int(x[0]),x[1],x[2],int(x[3]),float(x[4])])
          )

df = spark.createDataFrame(data=empdata,schema=schema)