如何在Kafka侦听器方法中寻找特定的偏移量?

问题描述

我正在尝试使用我的kafka侦听器方法从SQL数据库中寻求偏移量。 我在代码中使用了registerSeekCallback方法,但是在运行使用者(或启动容器)时会调用此方法。假设我的使用者正在运行,并且在MySql数据库中最后提交的偏移量为20。我手动将Mysql数据库中最后提交的偏移量更改为11,但是除非重新启动我的使用者(容器已重新启动),否则我的使用者将继续从21读取数据。我正在寻找任何选项,如果我可以在侦听器方法本身中覆盖或查找偏移量。任何帮助将不胜感激。

public class Listen implements ConsumerSeekAware 
{
 @Override
    public void registerSeekCallback(ConsumerSeekCallback callback)

    {
//      fetching offset from a database 
        Integer offset = offsetService.getOffset();
        callback.seek("topic-name",offset);

    }
 @KafkaListener(topics = "topic-name",groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment) throws Exception 
  {
//    processing the record 

      acknowledgment.acknowledge();    //manually commiting the record
//    committing the offset to MySQL database
  }
}

使用新的侦听器方法进行编辑:-

@KafkaListener(topics = "topic-name",groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment,@Header(KafkaHeaders.CONSUMER) Consumer<?,?> consumer)) throws Exception {
       // seeking old offset stored in database (which is 11 )
        consumer.seek(partition,offsetService.getOffset());
        log.info("record offset is {} and value is {}",record.offset(),record.value() );
        acknowledgment.acknowledge();
}

在数据库中,我最后一次提交的偏移量是11,kafka结束时的最后提交的偏移量是21。当我在kafka主题中写了一条新记录(即在offset 22上)时,我的使用者首先触发并处理22 offset,然后返回查找偏移量11并从那里开始处理。 为什么我在寻求偏移量11时为什么先消耗偏移量22?

使用上面的代码,每次我向kafka顶部写入新消息时,它首先处理记录,然后查找数据库中存在的偏移量。我有什么办法可以避免这种情况?

解决方法

Locator Strategies中有几种技术。

请记住,在下一次民意调查之前,对消费者执行搜索将不会生效(在上次民意调查中获取的所有记录都将首先发送给消费者)。

编辑

这是一个例子:

@SpringBootApplication
public class So63429201Application {

    public static void main(String[] args) {
        SpringApplication.run(So63429201Application.class,args).close();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String,String> template,Listener listener) {
        return args -> {
            IntStream.range(0,10).forEach(i -> template.send("so63429201",i % 3,null,"foo" + i));
            Thread.sleep(8000);
            listener.seekToTime(System.currentTimeMillis() - 11000);
            Thread.sleep(8000);
            listener.seekToOffset(new TopicPartition("so63429201",0),11);
            Thread.sleep(8000);
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63429201").partitions(3).replicas(1).build();
    }

}

@Component
class Listener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "so63429201",topics = "so63429201",concurrency = "2")
    public void listen(String in) {
        System.out.println(in);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition,Long> assignments,ConsumerSeekCallback callback) {
        System.out.println(assignments);
        super.onPartitionsAssigned(assignments,callback);
        callback.seekToBeginning(assignments.keySet());
    }

    public void seekToTime(long time) {
        getSeekCallbacks().forEach((tp,callback) -> callback.seekToTimestamp(tp.topic(),tp.partition(),time));
    }

    public void seekToOffset(TopicPartition tp,long offset) {
        getSeekCallbackFor(tp).seek(tp.topic(),offset);
    }

}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...