1、flink编程模型
1)抽象数据集
DataStream(实时):使用StreamExecutionEnvironment创建DataStream
DataSet(离线): 使用ExecutionEnvironment创建DataSet
2)编程模型
Source:从指定的数据源创建最原始的抽象数据集
Transformation:对数据集进行转换操作,返回一个新的数据集
Sink:将数据输出到指定的存储系统中
注意:job中必须有source和sink,Transformation可有可无
2、source
1)单并行的source
无论指定多少个并行度,该source对应的DataStream并行度只有1,即产生数据的subTask实例就一个,例如socketTextStream方法创建的DataStream
2)多并行的source
可以根据提交任务时指定程序的并行度,DataStreamSource可以有多个并行,即有多个读取数据的subTask实例,FlinkKafkaConsumer
3)自定义source
3.1)单并行:实现sourceFunction接口
3.2)多并行:ParallelSourceFunction接口,可以继承RichParalleSourceFunction抽象类(必须要实现run方法和cancel方法,可以选择性实现open方法和close方法)
[方法执行顺序:open(创建连接)->Run(while循环)->cancel->close(关闭连接,释放资源)]
3.3)调用addSource,将自定义的sourceFunction的实现类的实例传入到该方法中
4)flinkKafkaConsumer:
4.1)多并行的source,里面使用operator记录偏移量,开启checkpointing可以容错,
flinkKafkaConsumer对应的DataStreamSource的并行度的数量为提交任务时指定的-p,本地模式为当前机器逻辑核数量
4.2)如果DataStreamSource的并行度大于topic分区的数量,会有部分source对应的subtask是空跑,读取不到数据
4.3)如果DataStreamSource的并行度小于topic分区的数量,会有部分source对应的subtask可以读取多个分区的数量
4.4)最好是并行度和topic的分区的数量保持一致
3、 FlinkSink
sink是多并行,它的并行度与执行环境的并行度保持一致
调用完sink之后,可以再调用setparallelism设置sink的并行度
1、自定义sink 调用addSink方法,将实现sinkFunction接口或继承RichSinkFunction抽象类的实现传入到该方法中,必须重写一个invoke方法,在该方法中实现将数据写入到外部的存储系统中
自定义MyPrintSink:打印在控制台前面的编号是subtask的Index+1
可以通过getRuntimeContext().getIndexOfThisSubtask获取当前subtask的index
2、 RedisSink:结合Kafka,checkpointing可以实现数据的一致性(AtLeastOnce),因为RedisSink可以覆盖原来的数据
3 、FlinkKafkaProducer:结合Kafka,checkpointing可以实现数据的一致性(ExactlyOnce)
3.1)FlinkKafkaProducer支持事务:消费者在读取数据时,要指定事务隔离级别,只读取成功提交事务的数据(isolation.level=read_committed)
3.2)FlinkKafkaProducer继承了TwoParhaseCommitSinkFunction,实现了CheckpointingFunction和checkpointListener,可以将数据保存到operatorState中,checkpoint成功后再提交事务
4、FlinkTransformation
1)map算子
对DataStream中的数据依次取出来进行处理(做映射)
底层调用是transform方法,传入operator名称(Map),返回数据类型和StreamMap实例并将自定义的计算逻辑传入到StreamMap
StreamMap类实现了OnceinputStreamOperator接口,必须重写processElement方法,数据是封装到StreamRecord,使用Output将处理完的数据输出
StreamMap还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型,(接口的类型),只能传入mapfunction类型
2) fliter算子
对数据过滤,保留计算逻辑返回为true的数据
底层调用是transform方法,传入operator名称(fliter),返回数据类型和StreamFliter实例并将自定义的过滤逻辑传入到StreamFliter
StreamMap类实现了OnceinputStreamOperator接口,必须重写processElement方法,应用过滤逻辑,返回true就使用output输出
StreamMap还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型,(接口的类型)只能传入fliterFunction类型
3)FlatMap算子
对数据进行扁平化映射,可以输出0到多条数据,输出数据用collect方法
底层调用是transform方法,传入operator名称(FlatMap),返回数据类型和StreamFlatMap实例并将自定义的过滤逻辑传入到StreamFlatMap中
StreamMap类实现了OnceinputStreamOperator接口,必须重写processElement方法,应用计算逻辑,如果一条返回多条应用collect结合for循环将数据输出
StreamMap还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型,(接口的类型)只能传入FlatMapFunction类型
4)KeyBy
按照key的hash对数据进行分区,可以保证key相同的一定进入到一个分区内,但是一个分区内可以有多个分区的数据
是对数据进行实时的分区,不是上有发送给下游,而是将数据写入到对应的charnnrl的缓存中,下游到上游实时的拉取
KeyBy底层是newKeyedStream ,然后将父DataStream包起来,并且传入KyeBy的条件
最终会调用keyGroupStreamPartitioner的selectChannel的方法,将keyBy的条件传入到该方法中
步骤:
1.先计算key的HashCode值
2.将key的HashCode值进行特殊的hash处理:MathUtils.murmurHash(keyHash),避免hashcode返回的数字为负
3.将返回特殊hash值模 / 最大并行度(128默认)得到keyGroupId
keyGroupId*parallelism(此程序的并行度) / maxParallelism(默认最大并行度),返回分区编号
优点:可以将数据尽量均匀分配到多个分区并且key的hashcode为负
注:
1.如果将自定义POJO当成key必须重写hashcode方法
2.不能将数组当成keyBy的key
5)reduce
将keyedStream数据进行聚合
传入reduceFunction,输入和输出的类型保持一致
如果这个key的数据是第一次出现,不会调用自定义的reduce方法
底层调用的是StreamGroupReduceOperator的processElement方法,将初始值或累加的中间结果以valueState方式保存起来,通过多态的方式调用自定义的reduce方法,将reduce方法的返回值再更新状态到valueState中,最后使用ouput将数据输出
6)sum算子
对KeyedStream的数据进行聚合
底层调用的是aggregate方法,传入SumAggregator然后再调用reduce方法,在reduce方法中会根据数据的类型,调用具体的相加方法 eg : intSum,LongSum…
7)min max
只会返回keyBy的字段和最小值、最大值,如果还有其他字段返回的是第一次出现的值
8)minBy maxBy
不仅返回KeyBy的字段还会返回最小值、最大值,如果有多个字段还会返回最小值、最大值所在数据的全部字段
9)union算子
将多个类型一样的DateStream合并到一起,使用统一的方式进行处理,可以union一到多个DataStream,如果自己union自己是将数据double
10)connect算子
可以将两个不同类型的数据包装到一起,分别调用两个方法对两个数据流中的数据进行操作,可以让两个数据流共享状态
11)iterate
用来做迭代计算,类似一个分布式for循环
可以指定一个更新模型,对输入的数据进行运算;可以指定两个判断条件:继续迭代的条件和退出迭代的条件(输出数据的条件)
12)startNewChaining
从这个算子开始开启一个新链
13)disableChaining
将该算子的前面和后面的链都断开
14)slotSharingGroup
设置槽的名称,有就近跟随原则,如果前面的task被打上共享资源槽名称,后面的也跟随
可以将一些task调度到指定的槽内,比如计算密集型的
15)物理分区
Hash (keyBy)
随机(shuffle)
轮循 (rebalance)
在一个TaskManager中轮循rescaling
广播 (broadcast)
自定义 (partitionCustom)
16)、project (投影)
只能针对于tuple类型的数据
5、window
1)根据并行度分类
1.1)keyedWindow 可以1到多个并行
1.2) NokeyedWindow 并行度只有1
2)GlobalWindow
countwindow :只有countTrigger
3)TimeWindow
按照时间类型划分:processingTimeWindow和eventTimeWindow
按照划分的方式:滚动窗口、滑动窗口、会话窗口