KafkaConsumer.commitAsync行为的偏移量比以前低 如何复制如何避免使用commitAsync提交较低的偏移量

问题描述

kafka将如何处理对的呼叫

KafkaConsumer.commitAsync(Map<TopicPartition,OffsetAndMetadata> offsets,OffsetCommitCallback callback)

给定主题的偏移量值小于上一次调用时的值?

解决方法

它将简单地将分区的偏移量设置为您指定的值,因此下次您将消耗来自commitedOffset + 1的消息。
commitAsync()的Javadoc说:

提交的偏移量应该是应用程序将使用的下一条消息,即lastProcessedMessageOffset +1。

,

我很好奇,并对其进行了测试以查看其行为。正如文档中所写,@ haoyuwang在其答案(+1)中写的是正确的。

其背后的原因很简单。使用者组的已提交偏移量存储在内部主题__consumer_offsets中的Kafka中。该主题为compact,这意味着它旨在为给定密钥提供最新值。在您的情况下,键是使用者组,主题和分区的组合,而您的值是偏移量。

如果您现在

  • 提交偏移量10,并且由于稍后进行异步处理
  • 提交偏移量5

偏移量5将是__consumer_offsets主题中的最新值。这意味着您的消费者将从该主题分区读取的下一个偏移量是偏移量6,而不是偏移量11。

如何复制

您可以简单地重现它并通过在常规提交之后(同步)提交更早的偏移量来对其进行测试,如下所示:

consumer.commitSync();
consumer.commitSync(commitFirstMessage);

其中commitFirstMessage被定义为

TopicPartition zeroTopicPartition = new TopicPartition(topic,0);
OffsetAndMetadata zeroOffset = new OffsetAndMetadata(0L);

Map<TopicPartition,OffsetAndMetadata> commitFirstMessage = new HashMap<>();
commitFirstMessage.put(zeroTopicPartition,zeroOffset);

编辑:

如何避免使用commitAsync提交较低的偏移量

在书Kafka - The Definitive Guide中,建议避免由于重试调用commitAsync而提交较低的偏移量:

重试异步提交:获得异步重试正确的提交顺序的一种简单模式是使用单调递增的序列号。每次提交时增加序列号,并添加 提交到commitAsync回调时的序列号。当您准备发送重试时,请检查回调获得的提交序列号是否与实例变量相等;如果是,则没有较新的提交,可以重试。如果实例序列号更高,请不要重试,因为已经发送了新的提交。

一个实现可能看起来像这样(未经实际测试!):

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig,ConsumerRecord,KafkaConsumer,OffsetAndMetadata,OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException,TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG,"AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String,String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition,OffsetAndMetadata],exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}