从List <String> filePaths转发文件

问题描述

我们有一个来自数据库表的文件路径列表,其中包含创建时的时间戳。试图弄清楚我们如何使用db中的文件路径列表将仅这些文件从nfs转发到kafka sink。

现在,我正在使用ContinuousFileMonitoringFunction的定制版本,该文件夹的根目录将包含数据库显示的所有文件。此操作非常慢,因为要遍历文件夹以收集有关更新文件的信息,因为该文件夹太大,几乎没有TB的数据。

Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));

DataSet<String> ds  = result.toDataSet();

ds具有应发送到kafka的所有文件路径。

以下是我打算实施的想法。但是考虑到flink并行性,flink库支持等,有没有一种更好的有效方法

public class FileContentMap extends RichFlatMapFunction<String,String> {

      

    @Override
    public void flatMap(String input,Collector<String> out) throws Exception {

       
       
        // get the file path
        String filePath = input;

        String fileContent = readFile(input);

    out.collect(fileCOntent);

       
    }

    @Override
    public void open(Configuration config) {
       
    }
}

DataSet<String> contectDataSet = ds.map(new FileCOntentMap());

contectDataSet.addSink(kafkaProducer);

解决方法

您的方法对我来说似乎不错。也许更有效的方法是创建一个RichParallelSourceFunction,其中,在open()方法中,您对数据库进行了调用以获取已更新文件的列表,并在其中建立了一个内存列表。该特定源子任务(例如filePath.hashCode() % numSubTasks == mySubTask之类的文件)应该由您的FileContentMap处理。