错误 TreeNodeException:使用 Pyspark

问题描述

所以我在管道中进行一次性编码并对其进行拟合方法

我有一个包含分类列和数字列的数据框,所以我有一个使用字符串索引器的热编码分类列。

from pyspark.ml.feature import OneHotEncoderEstimator,StringIndexer,VectorAssembler


categoricalColumns = ['IncomeDetails','B2C','Gender','Occupation','POA_Status']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol,outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getoutputCol()],outputCols=[categoricalCol + "classvec"])
    stages += [stringIndexer,encoder]
    
    
label_stringIdx = StringIndexer(inputCol = 'target',outputCol = 'label')
stages += [label_stringIdx]


#new_col_array.remove("client_id")

numericCols = new_col_array
numericCols.append('age')


assemblerInputs = [c + "classvec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs,outputCol="features")
stages += [assembler]


from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(new_df1)
new_df1 = pipelineModel.transform(new_df1)
selectedCols = ['label','features'] + cols

我收到此错误

Py4JJavaError: An error occurred while calling o2053.fit.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,tree:
Exchange hashpartitioning(client_id#*****,200)
+- *(4) HashAggregate(keys=[client_id#*****],functions=[],output=[client_id#*****])
   +- Exchange hashpartitioning(client_id#*****,200)
      +- *(3) HashAggregate(keys=[client_id#*****],output=[client_id#*****])
         +- *(3) HashAggregate(keys=[client_id#*****,event_name#27993],output=[client_id#27980])
            +- Exchange hashpartitioning(client_id#*****,event_name#27993,200)
               +- *(2) HashAggregate(keys=[client_id#*****,output=[client_id#*****,event_name#27993])
                  +- *(2) Project [client_id#*****,event_name#27993]
                     +- *(2) broadcastHashJoin [client_id#*****],[Party_Code#*****],LeftSemi,buildright,false
                        :- *(2) Project [client_id#*****,event_name#27993]
                        :  +- *(2) Filter isnotnull(client_id#*****)
                        :     +- *(2) FileScan orc dbo.dp_clickstream_[client_id#*****,dt#28010] Batched: true,Format: ORC,Location: **PrunedInMemoryFileIndex**[s3n://processed/db-dbo-...,PartitionCount: 6,PartitionFilters: [isnotnull(dt#28010),(cast(dt#28010 as timestamp) >= 1610409600000000),(cast(dt#28010 as timest...,PushedFilters: [IsNotNull(client_id)],ReadSchema: struct<client_id:string,event_name:string>
                        +- broadcastExchange HashedRelationbroadcastMode(List(input[0,string,true]),false)


at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)


Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,tree:

我的 Spark 版本是 2.4.3

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)