Flinksql写出到pg并关联MysqL维表
注意事项
public class Test{
{
public static void main(String[] args) throws Exception {
EnvironmentSettings build = EnvironmentSettings.newInstance().inStreamingMode().useAnyPlanner().useBlinkPlanner().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, build);
kafkaToPostgressql(tableEnv);
}
private static void kafkaToPostgressql(StreamTableEnvironment tableEnv) throws Exception {
tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.mode","EXACTLY_ONCE");
tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval","500");
tableEnv.executesql("DROP TABLE IF EXISTS source_kafka");
// 订单源表
tableEnv.executesql(
"CREATE TABLE source_kafka ( \n" +
" `id` string, \n" +
" `user_id` STRING, \n" +
" `sku_id` STRING , \n" +
" `final_amount` STRING , \n" +
" `order_status` STRING , \n" +
" `delivery_address` STRING , \n" +
" `pay_id` STRING , \n" +
" `create_time` STRING , \n" +
" `expire_time` STRING , \n" +
" proctime as PROCTIME(), \n" +
" eventTime AS TO_TIMESTAMP(create_time) , \n" +
" WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND \n" +
" ) WITH ( \n" +
" 'connector' = 'kafka', \n" +
" 'topic' = 'kafka_pg_order', \n" +
" 'properties.bootstrap.servers' = 'ip:port', \n" +
" 'properties.group.id' = 'testGroup', \n" +
" 'scan.startup.mode' = 'earliest-offset', \n" +
" 'format' = 'json' \n" +
" )");
//维表信息 商品信息表
tableEnv.executesql(
" CREATE TABLE sku_dim ( \n" +
" id String , \n" +
" price decimal(18,2) , \n" +
" sku_name String , \n" +
" sku_desc String , \n" +
" weight decimal(18,2) , \n" +
" is_sale String , \n" +
" create_time String , \n" +
" PRIMARY KEY(id) NOT ENFORCED \n" +
" ) WITH ( \n" +
" 'connector' = 'jdbc', \n" +
" 'url' = 'jdbc:MysqL://ip:port/t3st?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true', \n" +
" 'table-name' = 't2' , \n" +
" 'username' = 'user' , \n" +
" 'password' = 'password' \n" +
" )");
//Sink 数据到PG
tableEnv.executesql(
"CREATE TABLE sink_pg (\n" +
" `id` string,\n" +
" `sku_name` string,\n" +
" `price_all` decimal(18,2) ,\n" +
" `times` string ,\n" +
" PRIMARY KEY (id, sku_name, times) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:postgresql://ip:port/postgres',\n" +
" 'username' = 'username',\n" +
" 'password' = 'password',\n" +
" 'table-name' = 'postgres.test.sku_price_today'\n" +
")");
// 临时表
tableEnv.executesql(" CREATE VIEW tmp_table AS \n" +
" select \n" +
" sku.id as id, \n" +
" sku.sku_name as sku_name, \n" +
" orders.final_amount , \n" +
" DATE_FORMAT(orders.create_time,'yyyy-MM-dd') AS create_time \n" +
" from \n" +
" source_kafka orders \n" +
" left join sku_dim FOR SYstem_TIME AS OF orders.proctime AS sku \n" +
" on orders.sku_id = sku.id ");
// 插入数据
tableEnv.executesql("insert into sink_pg (id ,sku_name , price_all , times )\n" +
"select \n" +
" id id ,\n" +
" sku_name sku_name,\n" +
" cast(sum(cast(final_amount as double)) as DECIMAL(18, 2) ) price_all , \n" +
" create_time as times \n" +
"from \n" +
"tmp_table t \n" +
"group by id,sku_name,create_time");
System.out.println("启动成功");
}
}
}