一、Source-单并行的Source
代码:
package cn._51doit.flink.day01;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* Source-单并行的Source
*/
public class FromCollectionDemo_01 {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181); //设置web ui的端口
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//有限的数据流(用fromCollection创建的并行度)
DataStreamSource<Integer> nums = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
int parallelism = nums.getParallelism();
System.out.println("fromCollection创建的并行度的DataStreamSource为:"+parallelism);
nums.print();
env.execute();
}
}
控制台打印输出:
验证结果1
基于集合的Source
- 基于集合的Source是将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据集DataStreamSource,它是DataStream的子类
- 所以也可以使用DataStream类型来引用。得到DataStream后就可以调用Transformation或Sink度数据进行处理了
fromCollection方法
验证结果2
- fromElements创建的并行度的DataStream为1,也是一个有限的数据流,执行程序就完全退出,通常用来做实验,程序执行完毕就退出
- fromElements(T …) 方法是一个非并行的Source
- 可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource
源码读解
从fromElements方法进去看,底层调的是addSource方法,它的并行度是1
在fromElementsFunction方法进去看,是实现了SourceFunction,它是用来读数据的
二、多并行的Source
(1)fromParallelCollection创建的Source是多并行的Source,并且是一个有限的数据流
代码演示:
package cn._51doit.flink.day01;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;
/**
* Source-多并行的Source
*/
public class FromParCollection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> nums = env.fromParallelCollection(new NumberSequenceIterator(1L, 20L),Long.class);
int parallelism = nums.getParallelism();
System.out.println("fromParallelCollection创建的DataStream的并行度为:"+parallelism);
nums.print();
env.execute();
}
}
它是一个多个的并行度,而且是一个有限的数据流,运行得出:它的并行度为4,有限的数据流,如下控制台打印的列表
fromParallelCollection(SplittableIterator, Class) 方法
- 是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数
- 第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型
(2)generateSequence创建的Source也是多并行的Source,并且是一个有限的数据流
代码演示
package cn._51doit.flink.day01;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;
/**
* Source-多并行的Source
*/
public class GenSeqDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> nums = env.generateSequence(1, 100);
int parallelism = nums.getParallelism();
System.out.println("generateSequence创建的DataStream的并行度为:"+parallelism);
nums.print();
env.execute();
}
}
运行得出:它的并行度为4,有限的数据流,如下控制台打印的列表:
3> 3
3> 7
3> 11
3> 15
3> 19
3> 23
3> 27
3> 31
3> 35
3> 39
3> 43
3> 47
3> 51
3> 55
3> 59
2> 2
2> 6
2> 10
2> 14
2> 18
2> 22
2> 26
2> 30
2> 34
2> 38
2> 42
2> 46
2> 50
2> 54
2> 58
2> 62
2> 66
2> 70
2> 74
2> 78
2> 82
2> 86
2> 90
2> 94
2> 98
4> 4
1> 1
1> 5
1> 9
1> 13
1> 17
1> 21
1> 25
1> 29
1> 33
1> 37
1> 41
1> 45
1> 49
1> 53
1> 57
1> 61
1> 65
1> 69
1> 73
1> 77
1> 81
1> 85
1> 89
1> 93
1> 97
3> 63
3> 67
3> 71
3> 75
3> 79
3> 83
3> 87
3> 91
3> 95
3> 99
4> 8
4> 12
4> 16
4> 20
4> 24
4> 28
4> 32
4> 36
4> 40
4> 44
4> 48
4> 52
4> 56
4> 60
4> 64
4> 68
4> 72
4> 76
4> 80
4> 84
4> 88
4> 92
4> 96
4> 100
generateSequence(long from, long to) 方法
- 是一个并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)该方法需要传入两个long类型的参数
- 第一个是起始值,第二个是结束值,返回一个DataStreamSource
源码读解
从fromParallelCollection方法点进去查看,返回fromParallelCollection方法点进去查看,底层调用也是addSource方法,还new FromSplittableIteratorFunction,它继承了RichParallelSourceFunction(抽象类),实现了ParallelSourceFunction方法
总结
单并行的Source:直接实现了SourceFunction接口
多并行的Source:可以洗出RichParallelSourceFunction或实现ParallelSourceFunction接口