——wirte by 橙心橙意橙续缘,
前言
白话系列
————————————————————————————
也就是我在写作时完全不考虑写作方面的约束,完全把自己学到的东西、以及理由和所思考的东西等等都用大白话诉说出来,这样能够让信息最大化的从自己脑子里输出并且输入到有需要的同学的脑中。PS:较为专业的地方还是会用专业口语诉说,大家放心!
白话Flink系列
————————————————————————————
主要是记录本人(国内某985研究生)在Flink基础理论阶段学习的一些所学,更重要的是一些所思所想,所参考的视频资料或者博客以及文献资料均在文末放出.由于研究生期间的课题组和研究方向与Flink接轨较多,而且Flink的学习对于想进入大厂的同学们来说也是非常的赞,所以该系列文章会随着本人学习的深入来不断修改和完善,希望大家也可以多批评指正或者提出宝贵建议。
目录
Table API和Flink sql简介
基本概念
- Flink对批处理和流处理,提供的一套
统一
的上层API。 - Table API 是一套内嵌在Java和Scala语言中的查询API,它允许以非常直观的方式
组合
来自一些关系运算符的查询 - Flink sql支持
基本的sql标准
语法。
Flink 1.10默认用的是Flink老的Table planner ,Flink 1.11以后默认用的就是blink的Table Planner
//Flink的table计划器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
//blink的Table计划器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
Flink中的Table含义
- TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。
- 表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成: Catalog名、数据库(database)名和对象名。
表可以是常规的,也可以是虚拟的(视图,View)
- 常规表(Table)一般可以用来描述
外部数据
,比如文件、数据库表或消息队列的数据,也可以直接从DataStream转换而来
- 视图(View)可以从现有的表中创建,通常是 table API 或者 sql 查询的 一个
结果集
我们要将
实体的可用于执行Flink sql的表table
与Flink中Table API中的Table对象
区分开
基本示例
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 读取数据
DataStreamSource<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
// 2. 转换成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3. 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4. 基于流创建一张表
Table dataTable = tableEnv.fromDataStream(dataStream);
// 5. 调用table API进行转换操作
Table resultTable = dataTable.select("id, temperature")
.where("id = 'sensor_1'");
// 6. 执行sql,将这张表注册一下
tableEnv.createTemporaryView("sensor", dataTable);
String sql = "select id, temperature from sensor where id = 'sensor_1'";
Table resultsqlTable = tableEnv.sqlQuery(sql);
// 将表操作的结果转化成流在标准输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultsqlTable, Row.class).print("sql");
env.execute();
}
基本程序结构
StreamTableEnvironment tableEnv = ... // 创建表的执行环境
// 创建一张表,与source连接用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");
// 创建一张表,与sink连接用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");
———————————————————具体逻辑是由下面几步来做的———————————————————————————
// 1. 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...); //类比于source操作
// 2. 通过 SQL查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ..."); //类比于transform操作
// 3. 将结果表写入输出表中
result.insertInto("outputTable"); //类比于sink操作
详细API范例
表环境创建和配置
基于老版本Planner的表环境配置
——————————————————————
// 1.1 基于老版本planner的流处理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// (1)无setting创建流表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// (2)有setting创建流表环境
EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
.uSEOldplanner() //指定老版本,也可默认
.inStreamingMode() //流
.build();
StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);
// 1.2 基于老版本planner的批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// (1)无setting创建批表环境,有setting同上
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);
对于老版本table planner来说,在Table API还没有做到流批统一,所以用的是两种不同的环境,分别用
BatchTableEnvironment
和StreamTableEnvironment
的create
来进行批、流表环境的创建,当然作业整体执行环境env
也必须对应。
基于新版本Blink的表环境配置
——————————————————————————————
// 1.3 基于Blink的流处理
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //采用blink planner
.inStreamingMode() //流
.build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);
// 1.4 基于Blink的批处理
EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //同上
.inBatchMode() //批
.build();
TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);
由于用的是Flink1.10的API所以默认采用的是old planner,所以要使用blink Planner需要在setting中进行设置
.useBlinkPlanner()
。
而且对于blink planner来说,底层是批流统一的,所以没有
BatchTableEnvironment
,我们要强行创建批表环境只可以通过底层的TableEnvironment
的create
方法来创建。
所以其实对于Flink1.11以后来说,不管要进行批、流表环境的创建,直接使用
StreamTableEnvironment.create()
即可。
创建表—从外部文件中读取数据
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
对于csv文件来说,在设置格式
.format()
时,可以选择老的csv格式new OldCsv()
,但即将被弃用;也可以选择new Csv()
,这样的话就需要添加上面这个依赖才可以。
// 2. 表的创建:连接外部系统,读取数据
// 2.1 读取文件
String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";
tableEnv.connect( new FileSystem().path(filePath)) //连接到外部文件系统
.withFormat( new Csv()) //设置外部数据的类型
.withSchema( new Schema() //使用field来构建表结构
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable"); //向Flink注册
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema(); //打印表结构
tableEnv.toAppendStream(inputTable, Row.class).print(); //将表转化为stream并输出,Row是默认的行数据类型
可以看到,这里我们采用的是
.from()
来从表环境中注册的table中创建Table对象,而在前面的实例中我们用的是fromfromDataStream()
来从datastream(已转换为POJO类型)中创建Table对象。
表的查询
一般的,sql中的操作都可以在Table API中得到对应
操作中的数据转换Table -> Table
// 3. 查询转换
// 3.1 Table API
// 简单转换
Table resultTable = inputTable.select("id, temp")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp"); //可以使用as重命名
// 3.2 sql
tableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");
// 打印输出
// 追加流,数据随着到来会再末尾追加
tableEnv.toAppendStream(resultTable, Row.class).print("result");
// 撤回流---随着数据的到来,表里的信息是会改变的,而不是简单的追加
tableEnv.toRetractStream(aggTable, Row.class).print("agg");
tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");
表的输出—输出到文件
// 4. 输出到文件
// 连接外部文件注册输出表
String outputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\out.txt";
tableEnv.connect( new FileSystem().path(outputPath))
.withFormat( new Csv())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
// .field("cnt", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable"); //像Flink注册
resultTable.insertInto("outputTable");
//aggTable.insertInto("outputTable");
往文件里进行写入还是有一定的局限的,因为
public class CsvTableSink implements BatchTableSink<Row>, AppendStreamTableSink<Row>
只支持批表和追加流2种形式的输出,而不支持动态改变的撤回流。(类似上面那样)
Table与Kafka的对接
// 2. 连接Kafka,读取数据
tableEnv.connect(new Kafka() //kafka ConnectorDescriptor kafka连接描述器
.version("0.11") // 版本
.topic("sensor") // kafka topic
.property("zookeeper.connect", "localhost:2181") //指定zookeeper
.property("bootstrap.servers", "localhost:9092") //指定bootstrap server
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
// 3. 查询转换
// 简单转换
Table sensorTable = tableEnv.from("inputTable");
Table resultTable = sensorTable.select("id, temp")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = sensorTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 4. 建立kafka连接,输出到不同的topic下
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sinktest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
// .field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
resultTable.insertInto("outputTable");
由于
public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row>
,故KafkaTableSink 也不支持由聚合操作得到的表的输出,只支持追加流。
小结
——————
也就是说Table输出到文件系统
和Kafka
,都不支持更新
模式,也就是数据输出就是输出了,没有办法再次增量地更新其内部的值。下一小节我们就来介绍一下Table的更新模式。
Table更新模式(Update Mode)
对于流式查询,需要声明如何在表和外部连接器之间执行转换。
因为在流式查询当中,我们除了要做到查询和增加以外,修改、删除这些更新操作也应该支持,但是由于流式的特性,如何将已经输出到外部的数据进行更新呢?这就需要我们的更新模式提前声明好了。
与外部系统交换的消息类型,由更新模式(Update Mode)指定。 包括以下3种
- 追加(Append)模式
- 表只做插入操作,和外部连接器只交换插入(Insert)消息
- 都支持
- 撤回(Retract)模式
- 更新插入(Upsert)模式
- 更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息
要进行更新,首先就需要
支持更新的外部系统
,像我们上面介绍过的fs和kafka均不支持,但是elasticsearch
就支持。
tableEnv.connect(new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp")
)
.inUpsertMode()
.withFormat(new Json())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable");
aggResultTable.insertInto("esOutputTable");
需要引入相应的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.1</version>
</dependency>
String sinkDDL=
"create table jdbcOutputTable (" +
" id varchar(20) not null, " +
" cnt bigint not null " +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:MysqL://localhost:3306/test', " +
" 'connector.table' = 'sensor_count', " +
" 'connector.driver' = 'com.MysqL.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = '123456' )";
tableEnv.sqlUpdate(sinkDDL) // 执行 DDL创建表
aggResultsqlTable.insertInto("jdbcOutputTable");
Stream和Table的转化
- 表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就 可以继续在 Table API 或 sql 查询的结果上运行了
- 将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将 表的每一行转换成的数据类型
- 表作为流式查询的结果,是动态更新的
- 转换有两种转换模式:追加(Append)模式和撤回(Retract)模式
将 Table 转换成 DataStream
- 追加模式(Append Mode)
- 用于表只会被插入(Insert)操作更改的场景
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
- 撤回模式(Retract Mode)
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable , Row.class);
DataStream转换成 Table
DataStream<SensorReading> dataStream = ... Table sensorTable = tableEnv.fromDataStream(dataStream);
- 默认转换后的 Table schema 和 DataStream 中POJO的字段定义一一对应,也可 以单独指定出来
DataStream<SensorReading> dataStream = ... Table sensorTable = tableEnv.fromDataStream(dataStream,"id, timestamp as ts, temperature");
Row.class
可以用于指定Flink中所有的数据类型;当然也可以手动指定,String,Tuple等.
创建临时视图
- 基于 DataStream 创建临时视图
tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");
- 基于 Table 创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable);
临时视图的功能可以参考上面介绍过的。
查看执行计划
- Table API 提供了一种机制来解释计算表的逻辑和优化查询计划
- 查看执行计划,可以通过 TableEnvironment.explain(table) 方法或 TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);