一、首先先配置flink的依赖,这里只用了flink-client客户端的类,其实包含了flink-java、flink-stream-java等依赖,不用再去单独引用
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency>
flink-client包的依赖如下,包含了flink-java、flink-stream-java等依赖
二、union 详情
1、flink 中的union和sql中union all比较类似,类似从2张表张组成相向的查询字段进行union all合并成一张表;fink union 两个相同结构的stream合并相同结构的流
2、定义UserInfo类
@Data public class UserInfo { private String id; private String name; private String gender; private Integer age; private Integer timestamp; public UserInfo() { } public UserInfo(String id, String name, String gender, Integer age, Integer timestamp) { this.id = id; this.name = name; this.gender = gender; this.age = age; this.timestamp = timestamp; } }
3、flink 程序
public class UnionTest { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 处理java bean 成 UserInfo SingleOutputStreamOperator<UserInfo> stream1 = env.fromElements( new UserInfo("1001", "小张", "男", 21, 1), new UserInfo("1002", "小红", "女", 18, 2) ); // 处理java bean 成 UserInfo SingleOutputStreamOperator<UserInfo> stream2 = env.fromElements( new UserInfo("2001", "老张", "男", 50, 4), new UserInfo("2002", "老红", "女", 53, 5), new UserInfo("2003", "老王", "男", 54, 6) ); // 使用union stream1.union(stream2) .process(new ProcessFunction<UserInfo, String>() { @Override public void processElement(UserInfo value, Context ctx, Collector<String> out) throws Exception { out.collect(value.toString()); } }) .print(); // 启动程序 env.execute(); } }
4、 结果:
三、connect详情
connect有两张使用方式:
1、第一种和union 一样,多少条数据进,多少条数据出;
// Person 类数据定义 SingleOutputStreamOperator<Person> stream1 = env.fromElements( new Person("1002", 21, 2), new Person("1003", 32, 3), new Person("1004", 41, 4) ); // PersonDetail 类数据定义 SingleOutputStreamOperator<PersonDetail> stream2 = env.fromElements( new PersonDetail("1001", "zhangsan", "man", 2), new PersonDetail("1002", "lisi", "woman", 3), new PersonDetail("1003", "wangwu", "man", 4), new PersonDetail("1004", "shugj", "man", 5) ); // 第一种,类似union all ,多少条数据进,多少数据出 // 不分区使用 stream1.connect(stream2) .process(new coprocessFunction<Person, PersonDetail, Tuple2<String, String>>() { @Override public void processElement1(Person value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { // 这里把id,age硬拼进去去,是否为了展示 out.collect(new Tuple2<>(value.getId(),value.getAge().toString())); } @Override public void processElement2(PersonDetail value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { // 这里把id,name硬拼进去去,是否为了展示 out.collect(new Tuple2<>(value.getId(),value.getName())); } }).print();env.execute();
输出结果:
2、 第二种和join一样,只和等值关联上的输出;
// Person 类数据定义 SingleOutputStreamOperator<Person> stream1 = env.fromElements( new Person("1002", 21, 2), new Person("1003", 32, 3), new Person("1004", 41, 4) ); // PersonDetail 类数据定义 SingleOutputStreamOperator<PersonDetail> stream2 = env.fromElements( new PersonDetail("1001", "zhangsan", "man", 2), new PersonDetail("1002", "lisi", "woman", 3), new PersonDetail("1003", "wangwu", "man", 4), new PersonDetail("1004", "shugj", "man", 5) ); // 第二种,这种类似join,等值关联多少条数据 // 分区 stream1.keyBy(data -> data.getId()) // 分区 .connect(stream2.keyBy(data -> data.getId())) .process(new KeyedcoprocessFunction<String, Person, PersonDetail, Tuple2<Person,PersonDetail>>() { ValueState<Person> personState; ValueState<PersonDetail> personDetailState; @Override public void open(Configuration parameters) throws Exception { personState = getRuntimeContext().getState(new ValueStateDescriptor<Person>("person",Person.class)); personDetailState = getRuntimeContext().getState(new ValueStateDescriptor<PersonDetail>("person-detail",PersonDetail.class)); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Person, PersonDetail>> out) throws Exception { personState.clear(); personDetailState.clear(); } @Override public void processElement1(Person value, Context ctx, Collector<Tuple2<Person, PersonDetail>> out) throws Exception { // 获取对方的状态数据 PersonDetail personDetail = personDetailState.value(); if (personDetail != null) { // 如果 personDetail 有数据,就输出数据,并清空状态 out.collect(new Tuple2<>(value, personDetail)); personState.clear(); personDetailState.clear(); } else { // 如果 personDetail,没有数据,就把 person写入状态数据 personState.update(value); // 开启定时器,10分钟后对方状态还没来就清空状态 // 获取当前时间 ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10*60*1000L); } } @Override public void processElement2(PersonDetail value, Context ctx, Collector<Tuple2<Person, PersonDetail>> out) throws Exception { // 获取对方的状态数据 Person person = personState.value(); if (person != null) { // 如果 person 有数据,就输出数据,并清空状态 out.collect(new Tuple2<>(person, value)); personState.clear(); personDetailState.clear(); } else { // 如果没有数据,就把personDetail写入状态 personDetailState.update(value); // 开启定时器,10分钟后对方状态还没来就清空状态 // 获取当前时间 ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10*60*1000L); } } }).print(); env.execute();
输出结果: