问题描述
https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-
在使用者上调用seek(long timestamp)方法时,时间戳是否必须等于消息发布的确切时间? 例如,如果我在t = 1、5、7处发送了三则消息,并且如果我致电Consumer.seek(3),我会收到错误消息吗?还是我的消费者将重置为t = 3,以便如果我调用Consumer.next(),我会收到第二条消息?
预先感谢
解决方法
通过Consumer#seek(long timestamp)
,您可以将订阅重置为给定的时间戳。搜索之后,使用者将开始接收发布时间等于或大于传递给seek
方法的时间戳的消息。
以下示例显示了如何将使用者重置为前一个小时:
try (
// Create PulsarClient
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create Consumer subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe()
) {
// Seek consumer to previous hour
consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"Message received: key=%s,value=%s,topic=%s,id=%s%n",msg.getKey(),msg.getValue(),msg.getTopicName(),msg.getMessageId().toString());
consumer.acknowledge(msg);
}
}
请注意,如果您有多个属于同一下标的使用者(例如,Key_Shared),则所有使用者都将被重置。