Flink JdbcUpsertTableSink不会更新mysql数据或覆盖旧数据

问题描述

我想问一下我何时使用JdbcUpsertTableSink将数据写入MySQL,数据没有更新,我设置了MySQL的主键,等等。

  1. 我正在使用flink 1.11版
  2. 我从蜂巢读取数据并写入mysql
  3. 下面是我的代码

public class TmpTableExample {
    public static void main(String[] args) throws IOException {


        ParameterToolFactory parameterToolFactory = new ParameterToolFactory();
        ParameterTool tool = parameterToolFactory.createParameterTool();

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        HiveCatalog testCataLog = new HiveCatalog(CataLogEnum.TEST.getCataLogName(),CataLogEnum.TEST.getDbName(),tool.get(FlinkProperEnum.FLINK_HIVE_CONF_DIR.key));

        
        tableEnv.registerCatalog(CataLogEnum.TEST.getCataLogName(),testCataLog);

        tableEnv.useCatalog(CataLogEnum.TEST.getCataLogName());
        String sql = "SELECT * FROM test.student as t1 JOIN test2.class t2 ON t1.id = t2.student_id";


        JdbcOptions options = JdbcOptions.builder()
                .setDBUrl(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_URL.key))
                .setDriverName(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_DRIVER_CLASS_NAME.key))
                .setUsername(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_USERNAME.key))
                .setPassword(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_PASSWORD.key))
                .setTableName("mysql_project_test")
                .setDialect(new MySQLDialect())
                .build();

        String[] fieldNames = {"student_id","student_name","student_curriculum","student_score","student_dt","class_id","class_student_id","class_name","class_size","class_dt"};

        DataType[] fieldTypes = {DataTypes.INT(),DataTypes.STRING(),DataTypes.DOUBLE(),DataTypes.INT(),DataTypes.STRING()};

        String[] keys = {"student_id","class_id"};

        TableSchema schema = TableSchema.builder()
                .fields(fieldNames,fieldTypes)
                .build();


        JdbcUpsertTableSink tableSink = JdbcUpsertTableSink.builder()
                .setOptions(options)
                .setTableSchema(schema)
                .setFlushIntervalMills(1000)
//                .setFlushMaxSize(10)
                .build();

        tableEnv.registerTableSink("mysql_project_test",tableSink);

        Table result = tableEnv.sqlQuery(sql);
        StatementSet statementSet = tableEnv.createStatementSet();
        statementSet.addInsert("mysql_project_test",result);
        statementSet.execute().print();
    }
}
  1. 我的mysql表结构如下
CREATE TABLE `mysql_project_test` (
  `student_id` int(11) NOT NULL DEFAULT '0',`student_name` varchar(200) DEFAULT NULL,`student_curriculum` varchar(200) DEFAULT NULL,`student_score` double DEFAULT NULL,`student_dt` varchar(200) DEFAULT NULL,`class_id` int(11) NOT NULL DEFAULT '0',`class_student_id` int(11) DEFAULT NULL,`class_name` varchar(200) DEFAULT NULL,`class_size` int(11) DEFAULT NULL,`class_dt` varchar(200) DEFAULT NULL,PRIMARY KEY (`student_id`,`class_id`)

当我在Java程序中修改学生分数时,数据不会更新,或者Flink不会写入数据。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)