Flink实现单词计数并写入MysqL
依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
导包:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import java.util.Arrays;
import java.util.Iterator;
java代码:
public class toMysqL {
public static void main(String[] args) throws Exception {
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.MysqL.cj.jdbc.Driver")
.setDBUrl("jdbc:MysqL://localhost:3306/test?user=root&password=123456")
.setQuery("insert into words (word,count) values (?,?) ")
//设置为每2条数据就提交一次
.setBatchInterval(2)
.finish();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.readTextFile("datas/1.txt");
//将读取的字符串按照空格分割成单个单词
FlatMapOperator<String, String> data1 = data.flatMap(new FlatMapIterator<String, String>() {
@Override
public Iterator<String> flatMap(String s) throws Exception {
//先把标点符号都去除
String s1 = s.replace("?", "");
String s2 = s1.replace(".", "");
String s3 = s2.replace(",", "");
return Arrays.asList(s3.split(" ")).iterator();
}
});
MapOperator<String, Tuple2<String, Integer>> data2 = data1.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
//将Tuple对象,按第一个元素进行分区,再将第二个元素进行累加
AggregateOperator<Tuple2<String, Integer>> data3 = data2.groupBy(0).sum(1);
MapOperator<Tuple2<String, Integer>, Row> data4 = data3.map(new MapFunction<Tuple2<String, Integer>, Row>() {
@Override
public Row map(Tuple2<String, Integer> ss) throws Exception {
Row row = new Row(2);
row.setField(0, ss.f0);
row.setField(1, ss.f1);
return row;
}
});
data4.print();
data4.output(jdbcOutput);
env.execute();
}
}
结果:
由于数据库字段名不区分大小写,因此不要把word设置为主键。因为存在大小写不同的单词,会发生主键冲突。