kafka再均衡监听器测试

【README】

  • 本文使用的kafka是最新的 kafka3.0.0;本文kafka集群有3个节点分别是 centos201, centos202, centos203 ; brokerid 分别为 1,2,3;
  • 本文主要用于测试 再均衡监听器;当有新消费者加入时,会发生分区再均衡;再均衡前就会调用再均衡监听器的 onPartitionsRevoked()方法
  • 本文的测试主题 hello12,有3个分区,每个分区2个副本;


【1】再均衡监听器

1)应用场景 

在消费者退出或进行分区再均衡前,会做一些清理工作,如提交偏移量,或关闭数据库连接;这些工作可以通过监听器来实现;

2)再均衡监听器 ConsumerRebalanceListener

实现 该监听器即可,有3个方法

  1. onPartitionsRevoked:在分区均衡开始【前】和消费者停止读取消息【后】被调用
  2. onPartitionsAssigned:分区再均衡【后】和消费者开始读取消息【前】被调用
  3. onPartitionsLost:分区宕机时调用(本文不涉及);
/**
 * @Description 消费者分区再均衡监听器实现类
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2021年12月11日
 */
public class ConsumerBalanceListenerImpl implements ConsumerRebalanceListener {

    /** 消费者 */
    private Consumer consumer;

    /** 主题分区偏移量数组  */
    private MyTopicPartitionOffset[] topicPartitionOffsetArr;

    /**
     * @description 构造器
     * @param consumer 消费者
     * @param curOffsets 当前偏移量
     * @author xiao tang
     * @date 2021/12/11
     */
    public ConsumerBalanceListenerImpl(Consumer consumer, MyTopicPartitionOffset[] topicPartitionOffsetArr) {
        this.consumer = consumer;
        this.topicPartitionOffsetArr = topicPartitionOffsetArr;
    }
    /** 
     * @description 在分区均衡开始【前】和消费者停止读取消息【后】被调用
     * @param partitions 分区列表(分区号从0开始计数)
     * @author xiao tang
     * @date 2021/12/12 
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("=== 分区再均衡触发onPartitionsRevoked()方法");
        // 提交偏移量回调(或记录错误日志)
        OffsetCommitCallback offsetCommitCallback = new OffsetCommitCallbackImpl();
        // 打印日志
        Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x->
            System.out.printf("提交偏移量信息,partition【%d】offset【%s】\n", x.partition(), x.offset())
        );

        // 把数组转为主题分区与偏移量映射,并提交最后一次处理的偏移量 (可以异步,也可以同步)
        // 同步提交一直重试或报超时异常
        // 异步提交传入提交回调,失败自行处理
        consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), offsetCommitCallback);
        // 停止程序的原因在于做实验,下次从本次提交的偏移量开始消费
        throw new RuntimeException("发生分区再均衡,程序结束");
    }
    /**
     * @description 分区再均衡【后】和消费者开始读取消息【前】被调用
     * @param partitions 主题分区列表
     * @author xiao tang
     * @date 2021/12/12
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // do sth
    }
    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        ConsumerRebalanceListener.super.onPartitionsLost(partitions);
    }
}

为了测试,分区再均衡监听器中,onPartitionsRevoked() 方法提交最后已消费消息的偏移量后,就抛出运行时异常结束运行,让其他消费者消费以便查看监听器是否成功提交偏移量;

3)消费者工具  

/**
 * @Description 消费者工具
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2021年12月12日
 */
public enum MyConsumerUtils {
    /** 单例 */
    INSTANCE;

    /**
     * @description 获取主题分区与偏移量映射
     * @param topicPartitionOffsetArr 主题分区与偏移量数组
     * @return 主题分区与偏移量映射
     * @author xiao tang
     * @date 2021/12/12
     */
    public static Map<TopicPartition, OffsetAndMetadata> getTopicPartitionOffsetMetadataMap(
            MyTopicPartitionOffset[] topicPartitionOffsetArr) {
        // 主题分区与偏移量映射
        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMetadataMap = new HashMap<>(topicPartitionOffsetArr.length);
        // 分区号大于-1,才是消费者接收的分区
        Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x -> {
            topicPartitionOffsetMetadataMap.put(
                    new TopicPartition(x.topic(), x.partition()), new OffsetAndMetadata(x.offset(), "no Metadata"));
        });
        return topicPartitionOffsetMetadataMap;
    }
}

 


【2】生产者 

/**
 * @Description 生产者
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2021年12月03日
 */
public class MyProducer {
    /** 主题 */
    public final static String TOPIC_NAME = "hello12";

    public static void main(String[] args) {
        /* 1.创建kafka生产者的配置信息 */
        Properties props = new Properties();
        /*2.指定连接的kafka集群, broker-list */
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
        /*3.ack应答级别*/
        props.put(ProducerConfig.ACKS_CONfig, "all");
        /*4.重试次数*/
        props.put(ProducerConfig.RETRIES_CONfig, 0);
        /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */
        props.put(ProducerConfig.BATCH_SIZE_CONfig, 16 * KfkNumConst._1K);
        /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */
        props.put(ProducerConfig.LINGER_MS_CONfig, 1);

        // 超时时间
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONfig, 3000);
        props.put(ProducerConfig.MAX_BLOCK_MS_CONfig, 3000);

        /*7. RecordAccumulator 缓冲区大小*/
        props.put(ProducerConfig.BUFFER_MEMORY_CONfig, 32 * KfkNumConst._1M);
        /*8. key, value 的序列化类 */
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig, StringSerializer.class.getName());

        /** 设置压缩算法 */
        props.put(ProducerConfig.COMPRESSION_TYPE_CONfig, "snappy");
        /** 设置拦截器 */
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONfig, Arrays.asList(TimeInterceptor.class.getName()));
        /** 设置阻塞超时时间 */
        props.put(ProducerConfig.MAX_BLOCK_MS_CONfig, 3600 * 1000);

        /* 9.创建生产者对象 */
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        /* 10.发送数据 */
        int order = 1;
        for (int i = 0; i < 10000; i++) {
            for (int j = 0; j < 3; j++) {
                Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));
                try {
                    System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");
                } catch (Exception e) {
                }
            }
        }
        /* 11.关闭资源 */
        producer.close();
        System.out.println("kafka生产者写入数据完成");
    }
}

 【3】消费者

【3.1】带有均衡监听器的消费者1

/**
 * @Description 带有均衡监听器的消费者
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2021年12月11日
 */
public class MyConsumerWithRebalanceListener {
    public static void main(String[] args) {
        // 创建消费者配置信息
        Properties props = new Properties();
        // 属性配置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONfig, MyProducer.TOPIC_NAME + "G1");
        //   关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, false);
        // 设置消费消息的位置,消费最新消息
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig, "latest");
        // 设置分区策略 (认值-RangeAssignor)
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONfig, RangeAssignor.class.getName());
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONfig, RoundRobinAssignor.class.getName());

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        int partitionSize = consumer.partitionsFor(MyProducer.TOPIC_NAME).size();
        // 创建分区偏移量数组并初始化 (仅考虑一个topic的情况)
        MyTopicPartitionOffset[] topicPartitionOffsetArr = new MyTopicPartitionOffset[partitionSize];
        IntStream.range(0, partitionSize).forEach(x -> topicPartitionOffsetArr[x] = new MyTopicPartitionOffset());

        // 订阅主题, 【传入分区再均衡监听器】
        consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME), new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));

        // 循环拉取
        try {
            while (!Thread.interrupted()) {
                System.out.println(DateUtils.getNowTimestamp() + " > 带均衡监听器消费者等待消费消息");
                TimeUtils.sleep(1000);
                // 消费消息
                ConsumerRecords<String, String> consumerRds = consumer.poll(100);
                System.out.println("poll begin {");
                for (ConsumerRecord<String, String> rd : consumerRds) {
                    System.out.println("消费者-WithRebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
                    // 提交的偏移量,是 当前消息偏移量加1
                    topicPartitionOffsetArr[rd.partition()].setAll(rd.topic(), rd.partition(), rd.offset() + 1);
                }
                System.out.println("poll end } ");
                // 【异步提交每个分区的偏移量】
                consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), new OffsetCommitCallbackImpl());
            }
        } finally {
            try {
                // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误
                consumer.commitSync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr));
            } finally {
                // 记得关闭消费者
                consumer.close();
            }
        }
    }
}

【3.2】 不带均衡监听器的消费者2 (测试用)

一个普通消费者;

/**
 * @Description 不带均衡监听器的消费者
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2021年12月11日
 */
public class MyConsumernorebalanceListener {
	public static void main(String[] args) {
		// 创建消费者配置信息
		Properties props = new Properties();
		// 属性配置
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, StringDeserializer.class.getName());
		props.put(ConsumerConfig.GROUP_ID_CONfig, MyProducer.TOPIC_NAME + "G1");
		//   关闭自动提交
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, false);
		// 设置消费消息的位置,消费最新消息
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig, "earliest");
		// 设置分区策略 (认值-RangeAssignor)
		props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONfig, RangeAssignor.class.getName());
		props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONfig, RoundRobinAssignor.class.getName());

		// 创建消费者
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		// 订阅主题, 【没有分区再均衡监听器】
		consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));

		// 循环拉取
		try {
			while(!Thread.interrupted()) {
				System.out.println(DateUtils.getNowTimestamp() + " > 没有均衡监听器的消费者等待消费消息");
				TimeUtils.sleep(1000);
				// 消费消息
				ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
				for(ConsumerRecord<String, String> rd : consumerRds) {
					System.out.println("消费者-norebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
				}
				// 【异步提交】
				consumer.commitAsync(new OffsetCommitCallbackImpl());
				if (!consumerRds.isEmpty()) break;
			}
		} finally {
			try {
				// 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误
				consumer.commitSync();
			} finally {
				// 记得关闭消费者
				consumer.close();
				System.out.println("消费者关闭");
			}
		}
	}
}

 我们可以发现,一旦消费者2消费了消息(消息不为空),就停止消费;

以便我们查看消费者2接收消息的偏移量是不是 消费者1的监听器在发生分区再均衡前提交的偏移量+1;


【4】测试

【4.1】测试步骤

step1) 运行生产者,发送消息到kafka;

step2) 运行 带有均衡监听器的消费者1 MyConsumerWithRebalanceListener, 消费消息;

在消费者订阅主题时,传入再均衡监听器;

// 订阅主题, 【传入分区再均衡监听器】
consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)
, new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));

step3)运行 不带均衡监听器的消费者2 MyConsumernorebalanceListener,消费消息;

一旦消费者2加入消费者组,就会发生分区再均衡,消费者1的某些分区所有权会转给消费者2,触发消费者1 的MyConsumerWithRebalanceListener监听器的 onPartitionsRevoked() 方法

  • 然后 onPartitionsRevoked()方法提交 消费者1处理的消息的偏移量后,就原地抛出异常停止运行;

【4.2】测试结果分析

1)消费者1(带分区再均衡监听器)的监听器最后提交的偏移量日志如下:

2021-12-12 10:23:30 > 带均衡监听器消费者等待消费消息
=== 分区再均衡触发onPartitionsRevoked()方法
提交偏移量信息,partition【0】offset【1296】
提交偏移量信息,partition【1】offset【1269】
提交偏移量信息,partition【2】offset【1269】

2)消费者2接收到的起始消息的偏移量日志如下(全部):

2021-12-12 10:23:27 > 没有均衡监听器的消费者等待消费消息
2021-12-12 10:23:32 > 没有均衡监听器的消费者等待消费消息
消费者-norebalanceListener-分区【0】offset【1296】值=[589]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1297】值=[592]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1298】值=[595]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1299】值=[598]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1300】值=[601]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1301】值=[604]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1302】值=[607]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1303】值=[610]  > ABCDE
消费者-norebalanceListener-分区【0】offset【1304】值=[613]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1269】值=[510]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1270】值=[513]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1271】值=[516]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1272】值=[519]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1273】值=[522]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1274】值=[525]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1275】值=[528]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1276】值=[531]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1277】值=[534]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1269】值=[509]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1270】值=[512]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1271】值=[515]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1272】值=[518]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1273】值=[521]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1274】值=[524]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1275】值=[527]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1276】值=[530]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1277】值=[533]  > ABCDE
消费者关闭

即 监听器提交的偏移量为:

partition【0】offset【1296】
partition【1】offset【1269】
partition【2】offset【1269】

而普通消费者接收消息的起始偏移量为

消费者-norebalanceListener-分区【0】offset【1296】值=[589]  > ABCDE
消费者-norebalanceListener-分区【2】offset【1269】值=[510]  > ABCDE
消费者-norebalanceListener-分区【1】offset【1269】值=[509]  > ABCDE

所以,偏移量是可以对上的;即再均衡监听器在发生分区再均衡前提交的消息偏移量后, 其他消费者可以接收该偏移量指定的消息;


【注意】

  • 注意1)监听器提交的偏移量是接收消息的当前偏移量+1;(注意要加1,非常重要),即加1后的偏移量作为其他消费者轮序消费的起始位置;
    • 代码:偏移量+1参见  MyConsumerWithRebalanceListener 的 接收消息的循环中的代码,如下:
 // 提交的偏移量,是 当前消息偏移量加1
topicPartitionOffsetArr[rd.partition()].setAll(
rd.topic(), rd.partition(), rd.offset() + 1); 

 

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...