我如何将其编写为 Spark 管道?

问题描述

如何将以下内容转换为 Spark 中的管道命令?

val vectorAssembler = new VectorAssembler().setInputCols(feature_list.toArray).setoutputCol("features").setHandleInvalid("skip")
val df_train_assembled = vectorAssembler.transform(df_train).select("features","sold_limit")
val toDense = udf((v: org.apache.spark.ml.linalg.Vector) => v.toDense)
val df_train_dense = df_train_assembled.withColumn("features",toDense(col("features")))

val xgbParam = Map(
  "eta" -> 0.05f,"seed" -> 0,"objective" -> "binary:logistic","max_depth" -> 13,//13
  "min_child_weight" -> 1,"base_score" -> (1 - SOLD_RATIO_TRAIN),"eval_metric" -> "logloss","scale_pos_weight" -> (1 - SOLD_RATIO_TRAIN) / (SOLD_RATIO_TRAIN * 10),"early_stopping_rounds" -> 1,//10
  "verbosity" -> 3,"num_round" -> dbutils.widgets.get("n_rounds").toInt,"num_workers" -> NWORKER
)

val xgbClassifier = new XGBoostClassifier(xgbParam).setFeaturesCol("features").setLabelCol("sold_limit")

我可以使用

创建管道
// Create the training pipeline
val pipeline = new Pipeline()
  .setStages(Array(vectorAssembler,xgbClassifier))

但这还不够,因为它跳过了以下步骤:

val df_train_assembled = vectorAssembler.transform(df_train).select("features",toDense(col("features")))

这甚至可以合并到管道中吗?以及如何避免将 df_train 作为输入传递?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)