问题描述
我想问一下我何时使用JdbcUpsertTableSink
将数据写入MySQL,数据没有更新,我设置了MySQL的主键,等等。
- 我正在使用flink 1.11版
- 我从蜂巢读取数据并写入mysql
- 下面是我的代码
public class TmpTableExample {
public static void main(String[] args) throws IOException {
ParameterToolFactory parameterToolFactory = new ParameterToolFactory();
ParameterTool tool = parameterToolFactory.createParameterTool();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
HiveCatalog testCataLog = new HiveCatalog(CataLogEnum.TEST.getCataLogName(),CataLogEnum.TEST.getDbName(),tool.get(FlinkProperEnum.FLINK_HIVE_CONF_DIR.key));
tableEnv.registerCatalog(CataLogEnum.TEST.getCataLogName(),testCataLog);
tableEnv.useCatalog(CataLogEnum.TEST.getCataLogName());
String sql = "SELECT * FROM test.student as t1 JOIN test2.class t2 ON t1.id = t2.student_id";
JdbcOptions options = JdbcOptions.builder()
.setDBUrl(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_URL.key))
.setDriverName(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_DRIVER_CLASS_NAME.key))
.setUsername(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_USERNAME.key))
.setPassword(tool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_PASSWORD.key))
.setTableName("mysql_project_test")
.setDialect(new MySQLDialect())
.build();
String[] fieldNames = {"student_id","student_name","student_curriculum","student_score","student_dt","class_id","class_student_id","class_name","class_size","class_dt"};
DataType[] fieldTypes = {DataTypes.INT(),DataTypes.STRING(),DataTypes.DOUBLE(),DataTypes.INT(),DataTypes.STRING()};
String[] keys = {"student_id","class_id"};
TableSchema schema = TableSchema.builder()
.fields(fieldNames,fieldTypes)
.build();
JdbcUpsertTableSink tableSink = JdbcUpsertTableSink.builder()
.setOptions(options)
.setTableSchema(schema)
.setFlushIntervalMills(1000)
// .setFlushMaxSize(10)
.build();
tableEnv.registerTableSink("mysql_project_test",tableSink);
Table result = tableEnv.sqlQuery(sql);
StatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsert("mysql_project_test",result);
statementSet.execute().print();
}
}
- 我的mysql表结构如下
CREATE TABLE `mysql_project_test` (
`student_id` int(11) NOT NULL DEFAULT '0',`student_name` varchar(200) DEFAULT NULL,`student_curriculum` varchar(200) DEFAULT NULL,`student_score` double DEFAULT NULL,`student_dt` varchar(200) DEFAULT NULL,`class_id` int(11) NOT NULL DEFAULT '0',`class_student_id` int(11) DEFAULT NULL,`class_name` varchar(200) DEFAULT NULL,`class_size` int(11) DEFAULT NULL,`class_dt` varchar(200) DEFAULT NULL,PRIMARY KEY (`student_id`,`class_id`)
当我在Java程序中修改学生分数时,数据不会更新,或者Flink不会写入数据。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)