问题描述
我有一个写入主题 A (TA) 的 Apache Kafka 2.6 Producer。 我还有一个 Kafka 流应用程序,它从 TA 消费并写入主题 B (TB)。 在流应用程序中,我有一个自定义时间戳提取器,它从消息负载中提取时间戳。
对于我的故障处理测试用例之一,我在应用程序运行时关闭了 Kafka 集群。
当生产者应用程序尝试将消息写入 TA 时,它无法写入,因为集群已关闭,因此(我假设)缓冲了消息。 假设它按时间递增的顺序接收 4 条消息 m1,m2,m3,m4。 (即 m1 是第一个,m4 是最后一个)。
当我将 Kafka 集群重新联机时,生产者将缓冲的消息发送到主题,但它们没有按顺序排列。例如,我收到 m2,然后是 m3,然后是 m1,然后是 m4。
这是为什么?是不是因为生产者中的缓冲是多线程的,每个同时生产到主题?
我认为自定义时间戳提取器将有助于在使用消息时对消息进行排序。但他们没有。或者我对时间戳提取器的理解是错误的。
我从 SO here 获得了一个解决方案,将所有事件从 tA 流式传输到另一个中间主题(比如 tA'),该中间主题将使用时间戳提取器传输到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。
我的 Producer 代码如下所示(我使用 Spring Cloud 来创建 Producer): Producer.java
@Service
public class Producer {
private String topicName = "input-topic";
private ApplicationProperties appProps;
@Autowired
private KafkaTemplate<String,MyEvent> kafkaTemplate;
public Producer() {
super();
}
@Autowired
public void setAppProps(ApplicationProperties appProps) {
this.appProps = appProps;
this.topicName = appProps.getinput().getTopicName();
}
public void sendMessage(String key,MyEvent ce) {
ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName,key,ce);
}
}
解决方法
这是为什么?是不是因为生产者中的缓冲是多线程的,每个同时生产到主题?
默认情况下,生产者最多允许向代理发送 5 个并行的动态请求,因此如果某些请求失败并重试,请求顺序可能会改变。
为避免这种重新排序问题,您可以设置 max.in.flight.requests.per.connection = 1
(可能会影响性能)或设置 enable.idempotence = true
。
顺便说一句:您没有说您的主题是有单个分区还是多个分区,以及您的消息是否有键?如果您的主题有多个分区,并且您的消息被发送到不同的分区,则无论如何都无法保证读取顺序,因为偏移量排序仅在一个分区内得到保证。
我认为自定义时间戳提取器将有助于在使用消息时对消息进行排序。但他们没有。或者我对时间戳提取器的理解是错误的。
时间戳提取器只提取时间戳。 Kafka Streams 不会对任何消息重新排序,而是始终按偏移顺序处理消息。
如果不是,那么时间戳提取器的具体用途是什么?只是为了将时间戳与事件相关联?
正确。
我从 SO here 得到了一个解决方案,将所有事件从 tA 流式传输到另一个中间主题(比如 tA'),该主题将使用时间戳提取器传输到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。
不,它不会进行任何重新排序。另一个 SO 问题即将更改时间戳,但是如果您按 a、b、c 的顺序读取消息,结果将按 a、b、c 的顺序写入(只是使用不同的时间戳,但应保留偏移顺序)。
这个演讲解释了更多细节:https://github.com/matomo-org/matomo-log-analytics/issues