kafka新版API

原文链接https://blog.csdn.net/Simon_09010817/article/details/83750115

接上篇 Kafka新版消费者API示例(一)https://blog.csdn.net/Simon_09010817/article/details/83748974

kafka手动提交策略提供了更加灵活的管理方式,在某些场景我们需要对消费偏移量有更精准的管理。以保证消息不被重复消费以及消息不丢失。

Kafka提供两种手动提交方式:

1.异步提交(commitAsync):

   异步模式下,提交失败也不会尝试提交。消费者线程不会被阻塞,因为异步操作,可能在提交偏移量操作结果未返回时就开始下一次拉取操作。

2.同步提交(CommitSync):

    同步模式下,提交失败时一直尝试提交,直到遇到无法重试才结束。同步方式下,消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误

实现手动提交前需要在创建消费者时关闭自动提交,设置enable.auto.commit=false。

由于异步提交不会等消费偏移量提交成功后再拉取下一次消息,因此异步提交提供了一个偏移量提交回调方法commitAsync(OffsetCommitCallback callback)。提交偏移量完成之后会回调OffsetCommitCallback接口的onComplete()方法

示例代码

  1. package com.simon.kafka.consumer.newconsumer;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import java.util.*;
  5. /**
  6. * Created by Simon on 2018/11/5.
  7. */
  8. public class KafkaConsumerAsync {
  9. public static void main(String[] args) throws InterruptedException {
  10. // 1、准备配置文件
  11. String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
  12. Properties props = new Properties();
  13. //kafka连接信息
  14. props.put("bootstrap.servers",kafkas);
  15. //消费者组id
  16. props.put("group.id", "test_group");
  17. //是否自动提交offset
  18. props.put("enable.auto.commit", "false");
  19. //在没有offset的情况下采取的拉取策略
  20. props.put("auto.offset.reset", "none");
  21. //自动提交时间间隔
  22. props.put("auto.commit.interval.ms", "1000");
  23. //设置一次fetch请求取得的数据最大为1k
  24. props.put("fetch.max.bytes", "1024");
  25. //key反序列化
  26. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  27. //value反序列化
  28. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  29. String topic = "test";
  30. // 2、创建KafkaConsumer
  31. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  32. // 3、订阅数据,不给定监听器
  33. consumer.subscribe(Collections.singleton(topic));
  34. try{
  35. //最少处理100条
  36. int minCommitSize = 100;
  37. //定义计数器
  38. int icount = 0;
  39. // 4、获取数据
  40. while (true) {
  41. ConsumerRecords<String, String> records = consumer.poll(100);
  42. for (ConsumerRecord<String, String> record : records) {
  43. System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
  44. icount++;
  45. }
  46. Thread.sleep(5000);
  47. //在业务逻辑处理成功后提交offset
  48. if(icount >= minCommitSize){
  49. //满足最少消费100条,再进行异步提交
  50. consumer.commitAsync(new OffsetCommitCallback() {
  51. @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  52. if(exception == null){
  53. System.out.println("commit success");
  54. }else {
  55. //提交失败,对应处理
  56. System.out.println("commit Failed");
  57. }
  58. }
  59. });
  60. //计数器归零
  61. icount = 0 ;
  62. }
  63. }
  64. }catch (Exception e){
  65. e.printstacktrace();
  66. }finally {
  67. //关闭连接
  68. consumer.close();
  69. }
  70. }
  71. }

以时间戳查询消息:

    Kafka在0.10.1.1版本上增加了时间戳索引文件。Kafka消费者API提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,参数为一个map对象,key为待查询的分区,value为待查询的时间戳。会返回一个大于等于该事件戳的第一条消息对应的偏移量和时间戳。若待查询分区不存在,会一直阻塞。

 示例:

   将kafka-client的maven依赖改为1.0.0 。在0.10.0.1中无法引入OffsetAndTimestamp类

  1. <!--引入kafka-clients-->
  2. <!--<dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>0.10.0.1</version>
  6. </dependency>-->
  7. <dependency>
  8. <groupId>org.apache.kafka</groupId>
  9. <artifactId>kafka-clients</artifactId>
  10. <version>1.0.0</version>
  11. </dependency>

代码

  1. package com.simon.kafka.consumer.newconsumer;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import java.util.Collections;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. import java.util.Properties;
  8. /**
  9. * Created by Simon on 2018/11/5.
  10. */
  11. public class KafkaConsumerTimestamps {
  12. public static void main(String[] args) throws InterruptedException {
  13. // 1、准备配置文件
  14. String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
  15. Properties props = new Properties();
  16. //kafka连接信息
  17. props.put("bootstrap.servers",kafkas);
  18. //消费者组id
  19. props.put("group.id", "test_group");
  20. //客户端id
  21. props.put("client.id", "test_group");
  22. //是否自动提交offset
  23. props.put("enable.auto.commit", "true");
  24. //在没有offset的情况下采取的拉取策略
  25. props.put("auto.offset.reset", "none");
  26. //自动提交时间间隔
  27. props.put("auto.commit.interval.ms", "1000");
  28. //设置一次fetch请求取得的数据最大为1k
  29. props.put("fetch.max.bytes", "1024");
  30. //key反序列化
  31. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  32. //value反序列化
  33. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  34. String topic = "test";
  35. // 2、创建KafkaConsumer
  36. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  37. // 3、订阅主题
  38. TopicPartition topicPartition = new TopicPartition(topic,0);
  39. consumer.assign(Collections.singleton(topicPartition));
  40. try{
  41. Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
  42. // 设置查询12 小时之前消息的偏移量
  43. timestampsToSearch.put(topicPartition, (System.currentTimeMillis() - 12 * 3600 * 1000));
  44. // 会返回时间大于等于查找时间的一个偏移量
  45. Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampsToSearch);
  46. OffsetAndTimestamp offsetTimestamp = null;
  47. // 用for 轮询,当然由于本例是查询一个分区,因此也可以用if 处理
  48. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
  49. // 若查询时间大于时间戳索引文件中最大记录索引时间,
  50. // 此时value 为空,即待查询时间点之后没有新消息生成
  51. offsetTimestamp = entry.getValue();
  52. if (null != offsetTimestamp) {
  53. // 重置消费起始偏移量
  54. consumer.seek(topicPartition, entry.getValue().offset());
  55. }
  56. }
  57. while (true) {
  58. //4.轮询拉取消息
  59. ConsumerRecords<String, String> records = consumer.poll(100);
  60. for (ConsumerRecord<String, String> record : records) {
  61. System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
  62. }
  63. }
  64. }catch (Exception e){
  65. e.printstacktrace();
  66. }finally {
  67. //关闭连接
  68. consumer.close();
  69. }
  70. }
  71. }

由于集群环境已选型为kafka0.10.0.1,本次无法按指定时间戳拉取,报错信息为不支持当前broker版本。

速度控制:

   应用场景中我们可能需要暂停某些分区消费,先消费其他分区,当达到某个条件再恢复该分区消费。

Kafka提供两种方法用于速度控制的方法

     1.pause(Collection<TopicPartition> partitions):暂停某些分区在拉取操作时返回数据给客户端

  1. //无返回值
  2. consumer.pause(Collections.singleton(topicPartition));

     2.resume(Collection<TopicPartition> partitions):恢复某些分区向客户端返回数据

  1. //无返回值
  2. consumer.resume(Collections.singleton(topicPartition));

 

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...