问题描述
我正在将一些为 Flink 1.5 版编写的遗留 Java 代码转换为 Flink 1.13.1 版。具体来说,我正在使用 Table API。我必须从 CSV 文件中读取数据,执行一些基本的 sql,然后将结果写回文件。
对于 Flink 1.5 版本,我使用以下代码来执行上述操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
TableSource tableSrc = CsvTableSource.builder()
.path("<CSV_PATH>")
.fieldDelimiter(",")
.field("date",Types.STRING)
.field("month",Types.STRING)
...
.build();
tableEnv.registerTableSource("CatalogTable",tableSrc);
String sql = "...";
Table result = tableEnv.sqlQuery(sql);
DataSet<Row1> resultSet = tableEnv.toDataSet(result,Row1.class);
resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-sql Example");
为了把上面的代码转换成Flink 1.13.1版本,我写了下面的代码
import org.apache.flink.table.api.Table;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(settings);
final String tableDDL = "CREATE TEMPORARY TABLE CatalogTable (" +
"date STRING," +
"month STRING," +
"..." +
") WITH (" +
"'connector' = 'filesystem'," +
"'path' = 'file:///CSV_PATH'," +
"'format' = 'csv'" +
")";
tableEnv.executesql(tableDDL);
String sql = "...";
Table result = tableEnv.sqlQuery(sql);
// DEPRECATED - BatchTableEnvironment required to convert Table to Dataset
BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
DataSet<Row1> resultSet = bTableEnv.toDataSet(result,Row1.class);
resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-sql Example");
然而,BatchTableEnvironment
在 Flink 1.13 版本中被标记为“Deprecated”。有没有其他方法可以将 Table
转换为 Dataset
或直接将 Table
写入文件?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)