Flink对接kafka

启动kafka和flink

1、进入zookeeper的bin目录下启动zookeeper

./zkServer.sh start

2、进入kafka的bin目录下启动kafka

/kafka-server-start.sh -daemon /opt/module/kafka-0.11/config/server.properties

3、进入flink的bin目录下启动flink

./start-cluster.sh 

kafka启动生产者

kafka主题为sensor

./bin/kafka-console-producer.sh --broker-list 192.168.158.202:90992 --topic sensor

添加pom依赖

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

执行

Java代码如下

package com.test.apitest.souceTest;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;

public class SourceTest02_kafka {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka配置项
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.153.202:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");



        // 从kafka中读取数据
        DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));

        sensor.print();
        //执行任务
        env.execute();
    }
}

kafka生产数据

 

 flink消费数据

 

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...