flinksql postgres到mysql

最近在学flink用sql语句写任务,postgres到MysqL总是报错,中间遇到了很多坑,网上查得写的都不详细,自己做完总结一下。

1.flink版本用的是1.12版本,高版本sql语法有变化,我使用的是1.12版本。

2.注意flink版本与驱动包版本相对应,高版本驱动跟flink版本不兼容导致任务启动失败。

3.postgres数据库MysqL数据库都是用docker搭建的,搭建postgres数据库参考:docker部署postgres数据库_今朝花落悲颜色的博客-CSDN博客

搭建好了一定要修改postgres.conf配置文件,在挂载目录/docker/postgresql/data/下面找到postgres.conf,修改wal_level=logical,然后重启postgres。

 MysqL表结构sql

CREATE TABLE `sync_test_1` (
  `total_gmv` bigint(20) DEFAULT NULL,
  `day_time` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

postgres表结构

CREATE TABLE "public"."ball" (
  "total_gmv" int8,
  "day_time" varchar(32) COLLATE "pg_catalog"."default"
);

ALTER TABLE "public"."ball" ADD CONSTRAINT "ball_pkey" PRIMARY KEY ("day_time");

flinksql语句

--源表

CREATE TABLE pgtest (

  day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义

) WITH (

  'connector' = 'postgres-cdc',  -- 必须为 'postgres-cdc'

  'hostname' = 'localhost',     -- 数据库的 IP

  'port' = '5432',     -- 数据库的访问端口

  'username' = 'postgres',           -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)

  'password' = '123456',    -- 数据库访问使用的密码

  'database-name' = 'postgres',  -- 需要同步的数据库

  'schema-name' = 'public',      -- 需要同步的数据库模式 (Schema)

  'table-name' = 'ball' ,      -- 需要同步的数据表名

  'decoding.plugin.name' = 'pgoutput',  -- pgoutput,必须

   'debezium.slot.name' = 'pgtestflink'  -- 指定slot名称,必须

);

--结果表

create table MysqLtest ( day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

    'connector' = 'jdbc',

    'url' = 'jdbc:MysqL://172.18.11.224:3306/flinktest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',

    'table-name' = 'sync_test_1',

    'username' = 'root',

    'password' = '123456'

);

INSERT INTO MysqLtest  

SELECT day_time,total_gmv

FROM pgtest ;

这里要说明一下cdc和jdbc,cdc是实时捕获源表数据变化的,jdbc是sink表的,所以要读哪张表用cdc连接器,写哪张表用jdbc连接器。 

反过来MysqL到postgres

--源表

create table MysqLtest ( day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

       'connector' = 'MysqL-cdc',

       'hostname' = '172.18.11.224',

       'port' = '3306',

       'username' = 'root',

       'password' = '123456',

       'database-name' = 'flinktest',

       'table-name' = 'sync_test_1'

);

//结果表

create table pgtest ( day_time VARCHAR,

    total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH (

    'connector' = 'jdbc',

    'url' = 'jdbc:postgresql://localhost:5432/postgres',

    'table-name' = 'public.ball',

    'username' = 'postgres',

    'password' = '123456'

);

INSERT INTO pgtest  

SELECT day_time,total_gmv

FROM MysqLtest ;

注意:必须指定主键PRIMARY KEY (day_time) NOT ENFORCED,与数据表中的主键要对应。

相关文章

显卡天梯图2024最新版,显卡是电脑进行图形处理的重要设备,...
初始化电脑时出现问题怎么办,可以使用win系统的安装介质,连...
todesk远程开机怎么设置,两台电脑要在同一局域网内,然后需...
油猴谷歌插件怎么安装,可以通过谷歌应用商店进行安装,需要...
虚拟内存这个名词想必很多人都听说过,我们在使用电脑的时候...