flinkCDC数据同步之 postgresql to starrocks

flinkCDC数据同步之 postgresql to starrocks

环境准备

starrocks

  • 集群介绍
角色 IP 配置 备注
FE 192.168.110.170;192.168.110.171;192.168.110.171 3follower 测试可一个FE
BE 192.168.110.170;192.168.110.171;192.168.110.171 3be 测试可一个BE
broker 192.168.110.170;192.168.110.171;192.168.110.171 3broker 测试可不用broker

postgresql

  • 安装模式

    docker 部署

     # 拉取镜像
     docker pull postgres
     # 启动postgresql
     docker run --name mypostgres -d -p 5432:5432 -e POSTGRES_PASSWORD=123456 postgres
    
  • 配置修改

    # 进入容器
    docker exec -it mypostgres /bin/bash
    
    # 下载vim命令
    apt-get update
    apt-get install vim
    
    # 寻找配置文件路径
    find / -name postgresql.conf
    
    # 编辑配置文件
    vi /var/lib/postgresql/data/postgresql.conf
    
    
    • 修改以下配置

      # 更改wal日志方式为logical
      wal_level = logical # minimal, replica, or logical
      
      # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
      max_replication_slots = 20 # max number of replication slots
      
      # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
      max_wal_senders = 20 # max number of walsender processes
      # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
      wal_sender_timeout = 180s # in milliseconds; 0 disable  
      

      wal_level是必须更改的,其它参数选着性更改

      更改配置文件postgresql.conf完成,需要重启pg服务生效

    • 重启服务

      # 退出容器后
      docker restart mypostgres
      
  • 创建pg用户,给予复制流权限

    -- pg新建用户
    CREATE USER pgcdc WITH PASSWORD '123456';
    -- 给用户复制流权限
    ALTER ROLE pgcdc replication;
    
    -- 创建数据库
    
    create database test;
    
    
    -- 给用户登录数据库权限
    grant CONNECT ON DATABASE test to pgcdc;
    
    -- 把当前库public下所有表查询权限赋给用户
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO pgcdc;
    
    
    
  • 发布表

    --创建表
    
    CREATE TABLE testcdc(
                               ID INT PRIMARY KEY      NOT NULL,
                               DEPT           CHAR(50) NOT NULL,
                               EMP_ID         INT      NOT NULL
    );
    
    -- 设置发布为true
    update pg_publication set puballtables=true where pubname is not null;
    -- 把所有表进行发布
    CREATE PUBLICATION dbz_publication FOR ALL TABLES;
    -- 查询哪些表已经发布
    select * from pg_publication_tables;
    
    
  • 更改表的复制标识包含更新和删除的值

    -- 更改复制标识包含更新和删除之前值
    ALTER TABLE testcdc REPLICA IDENTITY FULL;
    -- 查看复制标识(为f标识说明设置成功)
    select relreplident from pg_class where relname='testcdc';
    

OK,到这一步,设置已经完全可以啦,上面步骤都是必须的

编写代码

maven依赖

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.binary.version>2.11</scala.binary.version>
        <debezium.version>1.5.4.Final</debezium.version>
        <flink.version>1.13.6</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>
        <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks</artifactId>
            <version>1.2.2_flink-1.13_2.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>  <!--log4j日志包 -->

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

java代码

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PgsqlToStarRocksTest {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        // 获取flink流环境变量
        StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        exeEnv.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
        exeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
        // make sure 500 ms of progress happen between checkpoints
        exeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // allow only one checkpoint to be in progress at the same time
        exeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        exeEnv.getCheckpointConfig()
                .enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        FsStateBackend stateBackend = new FsStateBackend("file:\\\\D:\\fsdata");
        exeEnv.setStateBackend(stateBackend);
        // exeEnv.setDefaultSavepointDirectory();
        exeEnv.setParallelism(2);
        // 表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings);
        String sourceDDL =
                "CREATE TABLE pgsql_source (\n" +
                        " id int,\n" +
                        " dept STRING,\n" +
                        " emp_id int\n" +
                        ") WITH (\n" +
                        " 'connector' = 'postgres-cdc',\n" +
                        " 'hostname' = '192.168.110.13',\n" +
                        " 'port' = '5432',\n" +
                        " 'username' = 'pgcdc',\n" +
                        " 'password' = '123456',\n" +
                        " 'database-name' = 'test',\n" +
                        " 'schema-name' = 'public',\n" +
                        " 'debezium.snapshot.mode' = 'never',\n" +
                        " 'decoding.plugin.name' = 'pgoutput',\n" +
                        " 'debezium.slot.name' = 'testcdc',\n" +
                        " 'table-name' = 'testcdc'\n" +
                        ")";
        String sinkDDL =
                "CREATE TABLE sr_sink (\n" +
                        "id int ," +
                        "dept string," +
                        "emp_id int," +
                        "PRIMARY KEY (id) " +
                        "NOT ENFORCED" +
                        ") WITH ( " +
                        "'connector' = 'starrocks'," +
                        "'jdbc-url'='jdbc:mysql://192.168.110.170:9036,192.168.110.171:9036,192.168.110.172:9036'," +
                        "'load-url'='192.168.110.170:8036;192.168.110.171:8036;192.168.110.172:8036'," +
                        "'database-name' = 'test'," +
                        "'table-name' = 'testcdc'," +
                        "'username' = 'root'," +
                        "'password' = ''," +
                        "'sink.properties.column_separator' = '\\x01'," +
                        "'sink.properties.row_delimiter' = '\\x02'" +
                        ")";

        String transformSQL =
                "INSERT INTO sr_sink  select * from pgsql_source";
        // 执行source表ddl
        tableEnv.executeSql(sourceDDL);
        //  String selectSQL = "select * from  pgsql_source";
        //   Table table = tableEnv.e(selectSQL);

        tableEnv.executeSql(sinkDDL);

        //    String transformSQL = "select * from pgsql_source";
        TableResult tableResult = tableEnv.executeSql(transformSQL);
        tableResult.print();
        
    }
}

表结构

  • starrocks

     CREATE TABLE `testcdc` (
      `id` int(11) NOT NULL DEFAULT "-1" COMMENT "",
      `dept` varchar(65533) NULL COMMENT "",
      `emp_id` int(11) NULL COMMENT ""
    ) ENGINE=OLAP 
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`) BUCKETS 8 
    PROPERTIES (
    "replication_num" = "3",
    "in_memory" = "false",
    "storage_format" = "DEFAULT",
    "enable_persistent_index" = "false"
    );
    
  • pg

    CREATE TABLE testcdc(
                            ID INT PRIMARY KEY      NOT NULL,
                            DEPT           CHAR(50) NOT NULL,
                            EMP_ID         INT      NOT NULL
    );
    

相关文章

学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习...
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面...
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生...
Can’t connect to local MySQL server through socket \'/v...
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 ...
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服...