问题描述
我正在使用flink Latest(1.11.2)来处理示例MysqL数据库,该数据库可以正常工作。
此外,我已将flink-connector-jdbc_2.11-1.11.2,mysql-connector-java-8.0.21.jar,postgresql-42.2.17.jar添加到{FLINK} / lib
这是我的代码
sirius:/cours/journalisme/ia/ryzen7 # ./rigolade.py
./rigolade.py: line 4: from: command not found
./rigolade.py: line 6: Syntax error near unexpected token `('
./rigolade.py: line 6: `segment_image=instance_segmentation()'
T_CONfig = TableConfig()
B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
B_EXEC_ENV.set_parallelism(1)
BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV,T_CONfig)
ddl = """
CREATE TABLE nba_player4 (
first_name STRING,last_name STRING,email STRING,id INT
) WITH (
'connector' = 'jdbc','url' = 'jdbc:MysqL://localhost:3306/inventory','username' = 'root','password' = 'debezium','table-name' = 'customers'
)
""";
BT_ENV.sql_update(ddl)
sinkddl = """
CREATE TABLE print_table (
f0 INT,f1 INT,f2 STRING,f3 DOUBLE
) WITH (
'connector' = 'print'
)
""";
BT_ENV.sql_update(sinkddl)
sqlquery("SELECT first_name,last_name FROM nba_player4 ");
BT_ENV.execute("table_job")
最新:
这是我的docker yml文件。
py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
: org.apache.flink.table.api.ValidationException: sql validation Failed. findAndCreateTableSource Failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: required context properties mismatch.
The following properties are requested:
connector=jdbc
password=debezium
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=first_name
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=last_name
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=email
schema.3.data-type=INT
schema.3.name=id
table-name=customers
url=jdbc:MysqL://localhost:3306/inventory
username=root
The following factories have been considered:
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
docker ps命令显示
version: '2.1'
services:
jobmanager:
build: .
image: flink:latest
hostname: "jobmanager"
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:latest
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
MysqL:
image: debezium/example-MysqL
ports:
- "3306:3306"
environment:
- MysqL_ROOT_PASSWORD=debezium
- MysqL_USER=MysqLuser
- MysqL_PASSWORD=MysqLpw
更多信息:
我当前在docker中的flink环境是flink:scala_2.12-java8
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cf84c84f7821 flink "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 6121-6123/tcp,8081/tcp _taskmanager_1
09b19142d70a flink "/docker-entrypoint.…" 9 minutes ago Up 9 minutes 6123/tcp,0.0.0.0:8081->8081/tcp _jobmanager_1
4ac01eb11bf7 debezium/example-MysqL "docker-entrypoint.s…" 3 days ago Up 9 minutes 0.0.0.0:3306->3306/tcp,33060/tcp keras-flask-dep
pyflink jdbc连接器是flink 1.11版本的flink-connector-jdbc_2.11-1.11.2.jar。
docker pull flink:scala_2.12-java8
为了使用jdbc库,我尝试了两种方法
-
将flink-connector-jdbc_2.11-1.11.2.jar保存到/usr/local/lib/python3.7/site-packages/flink/lib
-
在python应用中配置类路径
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
但仍然出现相同的错误
解决方法
这可能无法完全回答问题,但是:从MySQL的角度来看,您的CREATE TABLE
语句不是有效的SQL,并且会引发语法错误。原因是VARCHAR
数据类型需要一个长度(即列可以容纳的最大字符数)。
例如:
CREATE TABLE nba_player4 (
first_name VARCHAR(20),last_name VARCHAR(20),email VARCHAR(50),id VARCHAR(10)
);
现在这是有效的MySQL代码。不过,我仍然建议在表中定义一个主键。在数据库设计中,这是一个好习惯,原因有很多,其中之一就是能够唯一地标识每个记录:这使得可以使用WHERE
子句准确选择给定记录,或者建立引用的外键约束桌子。名为id
的列可能是一个很好的候选者-可能最好将其定义为自动递增的整数。
所以,再见:
CREATE TABLE nba_player4 (
first_name VARCHAR(20),id INT PRIMARY KEY AUTO_INCREMENT
);
,
可以验证您使用的所有组件版本。很有可能您没有使用1.9版本的Flink,因为我看到它会产生一种新的数据类型属性格式,该格式将在以后的版本中引入。
在Flink 1.9中(至少在我检查过的1.9.3中是如此),属性的格式应为:schema.#.type
,而在您的情况下,属性应为schema.#.data-type
。
我建议要么升级到最新的Flink版本,要么至少确保您使用具有相同版本的所有组件。