通过java方式使用Kafka

一、kafka基本概念

     kafka将一个topic分为多个Partition,Partition在物理上由多个segment数据文件组成,每个segment数据文件都大小相等,按照顺序读写。每个Partition上的数据都均衡的分布在不同的broker上,partition的个数不能超过broker节点的个数。

      一个Partition上的消息是时间有序的,多个Partition之间的顺序无法保证

      kafka中很重要的特性,只需要一次消息,可以支持任意多的应用读取这个消息,consumer通过pull方式消费消息,kafka不删除已消费的消息,kafka中的数据的删除和其是否消息没有关系,只跟kafka broker上的两个配置有关系

  •  log.retention.hours=48 ===>数据最多保存48小时
  •  log.retention.byte=1073741824 ===>数据量最大1g

二、编写生产者客户端

2.1  引入pom


  <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>2.4.1</version>
   </dependency>

2.2  编写生产者客户端代码 


    public static void main(String[] args) {
        Properties prop = new Properties();

        prop.put("bootstrap.servers","192.168.221.131:9092");
        prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        prop.put("acks","all");
        prop.put("retries",0);
        prop.put("batch.size",16384);
        prop.put("linger.ms",1);
        prop.put("buffer.memory",33554432);
        String topic ="hello";
        KafkaProducer<String,String> producer = new KafkaProducer<>(prop);
        producer.send(new ProducerRecord<String,String>(topic,Integer.toString(2),"hello 
        kafka3"));
        producer.close();

    }

2.3 ack 消息确认机制:有三个值:0、1、all

  • 如果acks=0:表示需要leader节点回复收到消息,这样生产者才会发送下一条数据
  • 如果acks=1:只要Partition leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。
  • 如果acks=all:表示需要所有leader+副本节点回复收到消息(acks=-1),这样生产者才会发送下一条数 据

 2.4  retries

如果当前请求失败,则生产者可以自动重新连接,但是要是设置retries=0参数,则意味着请示失败不会重新连接,这样可以避免重复发送的可能

2.5   key.serializer 、value.serializer

数据在网络中传输需要进行序列化

2.6   send()

方法中有三个参数,第一个是指定发送的主题,第二个是设置消息的key,第三个是消息value

三、编写消费者客户端

  Properties prop = new Properties();

        prop.put("bootstrap.servers","192.168.221.131:9092");
        prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        
        prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("group.id","con-1");
        prop.put("auto.offset.reset","latest");
        //自动提交偏移量
        prop.put("auto.commit.intervals.ms","true");
        //自动提交时间
        prop.put("auto.commit.interval.ms","1000")
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);
        ArrayList<String> topics = new ArrayList<>();
        //可以订阅多个消息
        topics.add("hello");
        consumer.subscribe(topics);
        while(true){
            ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(20));
            for(ConsumerRecord<String,String> consumerRecord :poll){
                System.out.println(consumerRecord);
            }
        }
    }

3.1  prop.put("group.id","con-1");

指定消费者组id,在同一时刻消费组只有一个线程可以去消费一个分区的数据,不同的消费组可以消费同一个分区的消息

 

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...