Apache Pulsar-按时间戳显示Consumer.seek方法的行为是什么?

问题描述

https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-

在使用者上调用seek(long timestamp)方法时,时间戳是否必须等于消息发布的确切时间? 例如,如果我在t = 1、5、7处发送了三则消息,并且如果我致电Consumer.seek(3),我会收到错误消息吗?还是我的消费者将重置为t = 3,以便如果我调用Consumer.next(),我会收到第二条消息?

预先感谢

解决方法

通过Consumer#seek(long timestamp),您可以将订阅重置为给定的时间戳。搜索之后,使用者将开始接收发布时间等于或大于传递给seek方法的时间戳的消息。

以下示例显示了如何将使用者重置为前一个小时:

try (
    // Create PulsarClient
    PulsarClient client = PulsarClient
        .builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
    // Create Consumer subscription
    Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionMode(SubscriptionMode.Durable)
        .subscriptionType(SubscriptionType.Key_Shared)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
        .subscribe()
) {
    // Seek consumer to previous hour
    consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
    while (true) {
        final Message<String> msg = consumer.receive();
        System.out.printf(
            "Message received: key=%s,value=%s,topic=%s,id=%s%n",msg.getKey(),msg.getValue(),msg.getTopicName(),msg.getMessageId().toString());
        consumer.acknowledge(msg);
    }
}

请注意,如果您有多个属于同一下标的使用者(例如,Key_Shared),则所有使用者都将被重置。