问题描述
我试图在Flink中使用带引号的标识符(主要是因为我有一些与year
之类的关键字冲突的列名)。但是我无法解析。
我归纳为一个最小的失败示例:
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
String sql = "CREATE TABLE table1(\"ts\" TIMESTAMP) WITH ('connector' = 'filesystem','path' = 'file:///Users/ecerulm/tmp/csv/','format' = 'csv');";
System.out.println(sql);
tEnv.executesql(sql);
未能抱怨"ts"
部分:
org.apache.flink.table.api.sqlParserException: sql parse Failed. Encountered "\"" at line 1,column 21.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executesql(TableEnvironmentImpl.java:658)
at com.rubenlaguna.BatchJobTest.testBatchJob(BatchJobTest.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runchild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runchild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runchildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
据我从异常中了解到,它不期望使用"
字符,因为它与<QUOTED_IDENTIFIER>
之类的任何解析选项都不匹配,但据我从Calcite documentation所理解的,引用的标识符的语法只是"myidentifier"
。
解决方法
Flink documentation for Expanding Table Identifiers说:
标识符遵循SQL要求,这意味着可以使用反引号(`)对其进行转义。
它没有不提及双引号标识符。
因此,尽管Flink SQL基于Calcite SQL,尽管Calcite has double quoted identifiers似乎Flink SQL仅支持常规sql标识符+反引号标识符。因此,您将需要编写如下查询:
String sql = "CREATE TABLE table1(`ts` TIMESTAMP) WITH ('connector' = 'filesystem','path' = 'file:///Users/ecerulm/tmp/csv/','format' = 'csv')";
tEnv.executeSql(sql);
因此,使用"ts"
(带反引号)代替`ts`
(带双引号)。