声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。
注意 1. Flink使用1.11.0版本、HIVE使用3.1.2版本、Hadoop使用3.1.3版本
注意 2. 将hive-site.xml文件放在maven项目的resource目录下。
注意 3. 不编写脚本的话要执行 export HADOOP_CLAsspATH=`hadoop classpath` 语句
第一步:pom依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.11.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <log4j.version>2.12.1</log4j.version> <hive.version>3.1.2</hive.version> <hadoop.version>3.1.3</hadoop.version> </properties> <dependencies> <!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>1.11.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.11.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.11.0</version> <scope>provided</scope> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> <scope>provided</scope> </dependency> </dependencies>
第二步:编写代码如下 事先利用flink-sql-client 建立好表格
SET table.sql-dialect=hive; CREATE TABLE hive_table_2 ( log_info STRING ) PARTITIONED BY ( dt STRING, hr STRING ) STORED AS PARQUET LOCATION 'hdfs://localhost:9820/warehouse/gmall/test99' TBLPROPERTIES ( 'sink.partition-commit.trigger'='partition-time', 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.delay'='1 h', 'sink.rolling-policy.check-interval'='30s', 'sink.rolling-policy.rollover-interval'='1min', 'sink.partition-commit.policy.kind'='metastore,success-file' )
SET table.sql-dialect=default; CREATE TABLE kafka_table ( log_info STRING, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND )WITH ( 'connector' = 'kafka', 'topic' = 'ods_event_test', 'properties.bootstrap.servers' = 'http://localhost:9092', 'properties.group.id' = 'flink_hive_test', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' )
实际代码如下
package com.atguigu.flink.hive import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.catalog.hive.HiveCatalog object InsertData { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.enableCheckpointing(10000) val settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tEnv = StreamTableEnvironment.create(env,settings) val name = "myhive" val defaultDatabase = "gmall" val hiveconfdir = "/opt/module/hive/conf" // a local path val hive = new HiveCatalog(name, defaultDatabase, hiveconfdir) tEnv.registerCatalog("myhive", hive) tEnv.useCatalog("myhive") tEnv.executesql("INSERT INTO hive_table_2 SELECT log_info, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table") } }
第三步:打包提交到服务器
<build> <plugins> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.2</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the meta-inf folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>meta-inf/*.SF</exclude> <exclude>meta-inf/*.DSA</exclude> <exclude>meta-inf/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.atguigu.flink.hive.InsertData</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- Scala Compiler --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <args> <arg>-nobootcp</arg> </args> <addScalacArgs>-target:jvm-1.8</addScalacArgs> </configuration> </plugin> <!-- Eclipse Scala Integration --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.8</version> <configuration> <downloadSources>true</downloadSources> <projectnatures> <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> <projectnature>org.eclipse.jdt.core.javanature</projectnature> </projectnatures> <buildcommands> <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> </buildcommands> <classpathContainers> <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> </classpathContainers> <excludes> <exclude>org.scala-lang:scala-library</exclude> <exclude>org.scala-lang:scala-compiler</exclude> </excludes> <sourceIncludes> <sourceInclude>**/*.scala</sourceInclude> <sourceInclude>**/*.java</sourceInclude> </sourceIncludes> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>1.7</version> <executions> <!-- Add src/main/scala to eclipse build path --> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>src/main/scala</source> </sources> </configuration> </execution> <!-- Add src/test/scala to eclipse build path --> <execution> <id>add-test-source</id> <phase>generate-test-sources</phase> <goals> <goal>add-test-source</goal> </goals> <configuration> <sources> <source>src/test/scala</source> </sources> </configuration> </execution> </executions> </plugin> </plugins> </build>
第四步 提交 利用shell提交将包上传到hadoop102机器上的/opt/module/flink/examples 文件夹下
atguigu@hadoop102:/opt/module/flink$ bin/flink run -c com.atguigu.flink.hive.InsertData examples/TableFlink1113-1.0-SNAPSHOT.jar
第五步 遇到第一个错误
java.lang.NoClassDefFoundError: org/apache/flink/table/catalog/hive/HiveCatalog
at com.tal.flink.hive.StreamMain.main(StreamMain.java:50)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupinformation.doAs(UserGroupinformation.java:1893)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.classNotFoundException: org.apache.flink.table.catalog.hive.HiveCatalog
at java.net.urlclassloader.findClass(urlclassloader.java:382)
at java.lang.classLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.classLoader.loadClass(ClassLoader.java:357)
... 17 more
第六步 下载驱动包到 Flink的lib目录 解决第一个错误
cd /export/servers/nc/flink/lib
下载flink-sql-connector-hive包到flink的lib文件夹下
第七步 再次提交作业-任务提交成功