Flink流处理-Task之KafkaSourceDataTask

KafkaSourceDataTask

package pers.aishuang.flink.streaming.task;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;

/**

  • 主要完成从Kafka集群读取车辆的json数据并将其转换成ItcastDataObj,并将其

  • 通过errorData过滤出来正常的数据或者错误的数据,将正确的数据保存到HDFS上

  • 和HBase上,将错误的数据保存到HDFS上
    */
    public class KafkaSourceDataTask extends BaseTask {
    public static void main(String[] args) throws Exception{
    //1. 获取当前流执行环境-env
    StreamExecutionEnvironment env = getEnv(KafkaSourceDataTask.class.getSimpleName());

     //2. 获取Kafka中的车辆数据json字符串
     DataStreamSource<String> source = getKafkaStream(
             env,
             "__vehicle_consumer_",
             SimpleStringSchema.class
     );
     //-- 打印输出
     source.printToErr();
    
     //3. 将读取出来的json字符串转换为ItcastDataObj
     SingleOutputStreamOperator<ItcastDataObj> vehicleDataStream = source.map(
             new MapFunction<String, ItcastDataObj>() {
                 @Override
                 public ItcastDataObj map(String line) throws Exception {
                     return JsonParseUtil.parseJsonToObject(line);
                 }
             }
     );
     //-- 另种写法
     DataStream<ItcastDataObj> vehicleDataStream02 = source.map(JsonParseUtil::parseJsonToObject);
     //vehicleDataStream.printToErr();
     vehicleDataStream02.printToErr();
    
     //触发执行
     env.execute();
    

    }
    }

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...