source 是flink的数据源,简单介绍四种读取数据的方式:
1.从集合中读取
2.从文件中读取
3.从kafka中读取
4.自定义Source
1 package com.jy.bjz.source; 2 3 import org.apache.flink.api.common.serialization.SimpleStringSchema; 4 import org.apache.flink.streaming.api.datastream.DataStream; 5 import org.apache.flink.streaming.api.datastream.DataStreamSource; 6 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 7 import org.apache.flink.streaming.api.functions.source.sourceFunction; 8 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 9 10 import java.util.Arrays; 11 import java.util.Properties; 12 13 /** 14 * Todo 15 * 16 * @author baojiazhong 17 * @since 2021/9/9 10:48 18 */ 19 public class SourceTest { 20 public static void main(String[] args) { 21 // 创建流式执行环境 22 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 23 // 读取 集合数据 24 DataStream<Integer> collectionDataStream = env.fromCollection(Arrays.asList(1, 2, 3, 4)); 25 // 读取 文本数据 从路径中读取文件 26 String path = "C:\\Users\\15868\\Desktop\\TL\\flink\\learn\\src\\main\\resources\\hello.txt"; 27 DataStream<String> textDataStream = env.readTextFile(path); 28 // 读取 kafka数据 29 // 配置kafka 30 Properties properties = new Properties(); 31 properties.setProperty("bootstrap.servers", "localhost:9092"); 32 properties.setProperty("group.id", "consumer-group"); 33 properties.setProperty("key.deserializer", 34 "org.apacje.kafka.common.serialization.StringDeserializer"); 35 properties.setProperty("value.deserializer", 36 "org.apache.kafka.common.serialization.StringDeserializer"); 37 properties.setProperty("auto.offset.reset", "latest"); 38 DataStream<String> test = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties)); 39 // 自定义Source , 实现 SourceFunction<T> 方法 40 DataStreamSource<String> mySoucrceDataStream = env.addSource(new MySource()); 41 } 42 43 public static class MySource implements SourceFunction<String> { 44 45 public void run(SourceContext<String> ctx) throws Exception { 46 // 读取数据 47 ctx.collect(""); 48 } 49 50 public void cancel() { 51 //取消接收的条件 52 } 53 } 54 }