问题描述
flink 1.12.1 hive 1.1.0 jdk8
大家好。 当我使用 flink sql 创建 kafka 连接器表时,并加入 hive orc 表。
final StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner()
.inStreamingMode().build();
final StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,bsSettings);
.....
bsTableEnv.executesql("insert into kafka_table1 select a.clickhouse_time,a.op,a.deleted,a.name,a.id," +
"'a.table',a.db,b.price from kafka_table as a left join hive_catalog.hive_database.hive_orc_table" +
"FOR SYstem_TIME AS OF a.user_action_time AS b on a.id = b.id");
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc-nohive_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.airlift/aircompressor -->
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies>
最终构建工件。
运行flink run myjar.jar
但是在任务管理器日志中抛出异常。
Java.io.IOException: Problem reading file footer hdfs://xxx:8020/user/hive/warehouse/xxx.db/xxx/000000_0
和
但我更改了 hive 表格式。
create table xxx stored as textfile as
select * from pre_table
此作业运行成功。
或者我缺少对 flink 库的任何依赖?还是类文件冲突?
但我无法创立它。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)