如何在 Scala 中的 Apache Flink Datastream 中转换字符串?

问题描述

我正在编写一个 Scala 脚本,用于在 Apache Flink 中使用 Datastream API 处理 csv 文件。 我需要将格式固定为某些列,然后将它们转换为正确的类型。

我当前的代码是这样的:

package org.myorg.tareac

import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.{FilterFunction,FlatMapFunction,MapFunction}

object BatchJob {

  def main(args: Array[String]) {
    // set up the batch execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    
    val inputPath = "file:////home/almu/Practicas_BigData/Tarea_Flink_B/RIA_exportacion_datos_diarios_Huelva_20140206.csv"
    
    val input = env.readCsvFile[(String,String,String)](inputPath,fieldDelimiter=";",ignoreFirstLine=true)


    input.print()
    
    val casted_data = input.flatMap((IDPROVINCIA: String,SPROVINCIA: String,IDESTACION: String,SESTACION: String,FECHA: String,ANIO: String,TEMPMAX: String,HORMINTEMPMAX: String,TEMPMIN: String,HORMINTEMPMIN: String,TEMPMEDIA: String,HUMEDADMAX: String,HUMEDADMIN: String,HUMEDADMEDIA: String,VELVIENTO: String,DIRVIENTO: String,RADIACION: String,PRECIPitacION: String) => {
    
                                  IDESTACION.replace("\"","").cast(Types.Int);
                                  SESTACION.replace("\"","");
                                  FECHA.substring(6,9).cast(Int);
                                  RADIACION.replace(",",".").replace("",0).cast(Double);
                                  PRECIPitacION.replace(",0).cast(Double) 
                                 })
                                 

    // execute program
    env.execute("Flink Batch CSV Scala Processing")
  }
}

但是,当我执行 mvn clean package 时,我收到此错误

[ERROR] /home/almu/Practicas_BigData/Tarea_Flink_B/tareac/src/main/scala/batch/ProcessFileBatch.scala:54: error: value cast is not a member of String
[ERROR]                                   IDESTACION.replace("\"","").cast(Types.Int);
[ERROR]                                                                ^
[ERROR] one error found

我怎样才能正确地进行演员表?

解决方法

文件内容:

Jack,12,num_123,北京

代码:

  val input = env.readCsvFile[(String,String,String)](inputPath,fieldDelimiter = ",",ignoreFirstLine = false)

    input
      .map((value: (String,String)) => {
        (value._1,value._2.toInt,value._3.substring(value._3.indexOf("_") + 1).toInt)
      })
      .print()

结果:

(Jack,123)
,

.cast(Types.Int) 替换为 .toInt