flink1.12.1 读取 hive 兽人表异常无法初始化类 org.apache.orc.impl.ZlibCodec

问题描述

flink 1.12.1 hive 1.1.0 jdk8

大家好。 当我使用 flink sql 创建 kafka 连接器表时,并加入 hive orc 表。

final StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner()
                .inStreamingMode().build();
final StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,bsSettings);

.....

bsTableEnv.executesql("insert into kafka_table1 select a.clickhouse_time,a.op,a.deleted,a.name,a.id," +
                "'a.table',a.db,b.price from kafka_table as a left join hive_catalog.hive_database.hive_orc_table" +
                "FOR SYstem_TIME AS OF a.user_action_time AS b on a.id = b.id");

pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-orc-nohive_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.airlift/aircompressor -->
        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>aircompressor</artifactId>
            <version>0.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-core</artifactId>
            <version>1.5.6</version>
        </dependency>
    </dependencies>

最终构建工件。

运行flink run myjar.jar

但是在任务管理器日志中抛出异常。

Java.io.IOException: Problem reading file footer hdfs://xxx:8020/user/hive/warehouse/xxx.db/xxx/000000_0

taskmanager log

和 flink 库

flink lib pic

但我更改了 hive 表格式。

create table xxx stored as textfile as 
select * from pre_table

此作业运行成功。

或者我缺少对 flink 库的任何依赖?还是类文件冲突?

但我无法创立它。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)