问题描述
我有一个文件夹,里面有不同架构的镶木地板文件,所有文件都有一个共同的字段,保证存在。我想根据该字段过滤行并将其写回其他镶木地板文件。
spark 中的类似操作将相当简单,看起来像
val filtered = rawsDF.filter(!col("id").isin(idsToDelete: _*))
问题是,如果我要扩展 ParquetInputFormat,我还必须提供可能不同的架构
ParquetInputFmt(path: Path,messageType: MessageType) extends ParquetInputFormat[User](path,messageType)
或者像这样使用源函数:
class ParquetSourceFunction extends SourceFunction[String]{
override def run(ctx: SourceFunction.sourceContext[String]): Unit = {
val inputPath = "s3a://foo/day=01/"
val conf = new Configuration()
conf.setBoolean("recursive.file.enumeration",true)
conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath),conf)
val readFooter = ParquetFileReader.open(hadoopFile)
val Metadata = readFooter.getFileMetaData
val schema = Metadata.getSchema
val parquetFileReader = new ParquetFileReader(hadoopFile,ParquetReadOptions.builder().build())
parquetFileReader.getFilteredRecordCount
var pages: PageReadStore = null
try {
while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages,new GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group: Group = recordReader.read()
val ind = group.getType.getFieldindex("id")
val id = group.getInteger(ind,ind)
if (!listofIds.contains(id))
ctx.collect(?) // how can I get the original row ?
}
}
}
}
我对后者的问题是我无法获取原始数据
有什么想法吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)