flink中union 和connect学习记录

一、首先先配置flink的依赖,这里只用了flink-client客户端的类,其实包含了flink-java、flink-stream-java等依赖,不用再去单独引用

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
</dependency>

flink-client包的依赖如下,包含了flink-java、flink-stream-java等依赖

 

 

二、union 详情

1、flink 中的union和sql中union all比较类似,类似从2张表张组成相向的查询字段进行union all合并成一张表;fink union 两个相同结构的stream合并相同结构的流

2、定义UserInfo类

@Data
public class UserInfo {
    private String id;
    private String name;
    private String gender;
    private Integer age;
    private Integer timestamp;

    public UserInfo() {
    }

    public UserInfo(String id, String name, String gender, Integer age, Integer timestamp) {
        this.id = id;
        this.name = name;
        this.gender = gender;
        this.age = age;
        this.timestamp = timestamp;
    }
}

 3、flink 程序

public class UnionTest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 处理java bean 成 UserInfo
        SingleOutputStreamOperator<UserInfo> stream1 = env.fromElements(
                new UserInfo("1001", "小张", "男", 21, 1),
                new UserInfo("1002", "小红", "女", 18, 2)
        );
        // 处理java bean 成 UserInfo
        SingleOutputStreamOperator<UserInfo> stream2 = env.fromElements(
                new UserInfo("2001", "老张", "男", 50, 4),
                new UserInfo("2002", "老红", "女", 53, 5),
                new UserInfo("2003", "老王", "男", 54, 6)
        );

        // 使用union
        stream1.union(stream2)
                .process(new ProcessFunction<UserInfo, String>() {
                    @Override
                    public void processElement(UserInfo value, Context ctx, Collector<String> out) throws Exception {
                        out.collect(value.toString());
                    }
                })
                .print();

        // 启动程序
        env.execute();
    }
}

4、 结果:

 

三、connect详情

connect有两张使用方式:

1、第一种和union 一样,多少条数据进,多少条数据出;

// Person 类数据定义
SingleOutputStreamOperator<Person> stream1 = env.fromElements(
        new Person("1002", 21, 2),
        new Person("1003", 32, 3),
        new Person("1004", 41, 4)
);

// PersonDetail 类数据定义
SingleOutputStreamOperator<PersonDetail> stream2 = env.fromElements(
        new PersonDetail("1001", "zhangsan", "man", 2),
        new PersonDetail("1002", "lisi", "woman", 3),
        new PersonDetail("1003", "wangwu", "man", 4),
        new PersonDetail("1004", "shugj", "man", 5)
);

// 第一种,类似union all ,多少条数据进,多少数据出
// 不分区使用
stream1.connect(stream2)
        .process(new coprocessFunction<Person, PersonDetail, Tuple2<String, String>>() {
            @Override
            public void processElement1(Person value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                // 这里把id,age硬拼进去去,是否为了展示
                out.collect(new Tuple2<>(value.getId(),value.getAge().toString()));
            }

            @Override
            public void processElement2(PersonDetail value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                // 这里把id,name硬拼进去去,是否为了展示
                out.collect(new Tuple2<>(value.getId(),value.getName()));
            }
        }).print();
env.execute();

输出结果:

 

 

2、 第二种和join一样,只和等值关联上的输出;

// Person 类数据定义
        SingleOutputStreamOperator<Person> stream1 = env.fromElements(
                new Person("1002", 21, 2),
                new Person("1003", 32, 3),
                new Person("1004", 41, 4)
        );

        // PersonDetail 类数据定义
        SingleOutputStreamOperator<PersonDetail> stream2 = env.fromElements(
                new PersonDetail("1001", "zhangsan", "man", 2),
                new PersonDetail("1002", "lisi", "woman", 3),
                new PersonDetail("1003", "wangwu", "man", 4),
                new PersonDetail("1004", "shugj", "man", 5)
        );

// 第二种,这种类似join,等值关联多少条数据
// 分区
stream1.keyBy(data -> data.getId()) // 分区
        .connect(stream2.keyBy(data -> data.getId()))
        .process(new KeyedcoprocessFunction<String, Person, PersonDetail, Tuple2<Person,PersonDetail>>() {
            ValueState<Person> personState;
            ValueState<PersonDetail> personDetailState;

            @Override
            public void open(Configuration parameters) throws Exception {
                personState = getRuntimeContext().getState(new ValueStateDescriptor<Person>("person",Person.class));
                personDetailState = getRuntimeContext().getState(new ValueStateDescriptor<PersonDetail>("person-detail",PersonDetail.class));
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Person, PersonDetail>> out) throws Exception {
                personState.clear();
                personDetailState.clear();
            }

            @Override
            public void processElement1(Person value, Context ctx, Collector<Tuple2<Person, PersonDetail>> out) throws Exception {
                // 获取对方的状态数据
                PersonDetail personDetail = personDetailState.value();

                if (personDetail != null) {
                    // 如果 personDetail 有数据,就输出数据,并清空状态
                    out.collect(new Tuple2<>(value, personDetail));
                    personState.clear();
                    personDetailState.clear();
                } else {
                    // 如果 personDetail,没有数据,就把 person写入状态数据
                    personState.update(value);

                    // 开启定时器,10分钟后对方状态还没来就清空状态
                    // 获取当前时间
                    ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10*60*1000L);
                }
            }

            @Override
            public void processElement2(PersonDetail value, Context ctx, Collector<Tuple2<Person, PersonDetail>> out) throws Exception {
                // 获取对方的状态数据
                Person person = personState.value();

                if (person != null) {
                    // 如果 person 有数据,就输出数据,并清空状态
                    out.collect(new Tuple2<>(person, value));
                    personState.clear();
                    personDetailState.clear();
                } else {
                    // 如果没有数据,就把personDetail写入状态
                    personDetailState.update(value);

                    // 开启定时器,10分钟后对方状态还没来就清空状态
                    // 获取当前时间
                    ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10*60*1000L);
                }
            }
        }).print();
env.execute();

输出结果:

 

 

相关文章

显卡天梯图2024最新版,显卡是电脑进行图形处理的重要设备,...
初始化电脑时出现问题怎么办,可以使用win系统的安装介质,连...
todesk远程开机怎么设置,两台电脑要在同一局域网内,然后需...
油猴谷歌插件怎么安装,可以通过谷歌应用商店进行安装,需要...
虚拟内存这个名词想必很多人都听说过,我们在使用电脑的时候...