Apache Flink 1.13 版将表转换为数据集?

问题描述

我正在将一些为 Flink 1.5 版编写的遗留 Java 代码转换为 Flink 1.13.1 版。具体来说,我正在使用 Table API。我必须从 CSV 文件中读取数据,执行一些基本的 sql,然后将结果写回文件

对于 Flink 1.5 版本,我使用以下代码来执行上述操作

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        
TableSource tableSrc = CsvTableSource.builder()
    .path("<CSV_PATH>")
    .fieldDelimiter(",")
    .field("date",Types.STRING)
    .field("month",Types.STRING)
    ...
    .build();
        
    tableEnv.registerTableSource("CatalogTable",tableSrc);

String sql = "...";
Table result = tableEnv.sqlQuery(sql);
DataSet<Row1> resultSet = tableEnv.toDataSet(result,Row1.class);
resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-sql Example");

为了把上面的代码转换成Flink 1.13.1版本,我写了下面的代码

import org.apache.flink.table.api.Table;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(settings);

final String tableDDL = "CREATE TEMPORARY TABLE CatalogTable (" +
    "date STRING," +
    "month STRING," +
    "..." +
    ") WITH (" +
    "'connector' = 'filesystem'," +
    "'path' = 'file:///CSV_PATH'," +
    "'format' = 'csv'" +
    ")";

tableEnv.executesql(tableDDL);

String sql = "...";
Table result = tableEnv.sqlQuery(sql);

// DEPRECATED - BatchTableEnvironment required to convert Table to Dataset
BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
DataSet<Row1> resultSet = bTableEnv.toDataSet(result,Row1.class);

resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-sql Example");

然而,BatchTableEnvironment 在 Flink 1.13 版本中被标记为“Deprecated”。有没有其他方法可以将 Table 转换为 Dataset 或直接将 Table 写入文件

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)