如何使用 Flink 过滤具有公共字段但不同架构的镶木地板文件

问题描述

我有一个文件夹,里面有不同架构的镶木地板文件,所有文件都有一个共同的字段,保证存在。我想根据该字段过滤行并将其写回其他镶木地板文件

spark 中的类似操作将相当简单,看起来像

val filtered = rawsDF.filter(!col("id").isin(idsToDelete: _*))

问题是,如果我要扩展 ParquetInputFormat,我还必须提供可能不同的架构

ParquetInputFmt(path: Path,messageType: MessageType) extends ParquetInputFormat[User](path,messageType)

或者像这样使用源函数

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.sourceContext[String]): Unit = {
     val inputPath = "s3a://foo/day=01/"
    val conf = new Configuration()
    conf.setBoolean("recursive.file.enumeration",true)
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

    val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath),conf)
    val readFooter = ParquetFileReader.open(hadoopFile)
    val Metadata = readFooter.getFileMetaData
    val schema = Metadata.getSchema
    val parquetFileReader = new ParquetFileReader(hadoopFile,ParquetReadOptions.builder().build())
    parquetFileReader.getFilteredRecordCount
    var pages: PageReadStore = null
    try {
      while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
        val rows = pages.getRowCount
        val columnIO = new ColumnIOFactory().getColumnIO(schema)
        val recordReader = columnIO.getRecordReader(pages,new GroupRecordConverter(schema))
        (0L until rows).foreach { _ =>
          val group: Group = recordReader.read()
          val ind = group.getType.getFieldindex("id")
          val id = group.getInteger(ind,ind)
         if (!listofIds.contains(id))
              ctx.collect(?) // how can I get the original row ?

        }
      }
    }
  }
    

我对后者的问题是我无法获取原始数据

有什么想法吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)