Apache Kafka Streams:乱序消息

问题描述

我有一个写入主题 A (TA) 的 Apache Kafka 2.6 Producer。 我还有一个 Kafka 流应用程序,它从 TA 消费并写入主题 B (TB)。 在流应用程序中,我有一个自定义时间戳提取器,它从消息负载提取时间戳。

对于我的故障处理测试用例之一,我在应用程序运行时关闭了 Kafka 集群。

当生产者应用程序尝试将消息写入 TA 时,它无法写入,因为集群已关闭,因此(我假设)缓冲了消息。 假设它按时间递增的顺序接收 4 条消息 m1,m2,m3,m4。 (即 m1 是第一个,m4 是最后一个)。

当我将 Kafka 集群重新联机时,生产者将缓冲的消息发送到主题,但它们没有按顺序排列。例如,我收到 m2,然后是 m3,然后是 m1,然后是 m4。

这是为什么?是不是因为生产者中的缓冲是多线程的,每个同时生产到主题

我认为自定义时间戳提取器将有助于在使用消息时对消息进行排序。但他们没有。或者我对时间戳提取器的理解是错误的。

我从 SO here 获得了一个解决方案,将所有事件从 tA 流式传输到另一个中间主题(比如 tA'),该中间主题将使用时间戳提取器传输到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。

我的 Producer 代码如下所示(我使用 Spring Cloud 来创建 Producer): Producer.java

@Service
public class Producer {

    private String topicName = "input-topic";
        
    private ApplicationProperties appProps;
    
    @Autowired
    private KafkaTemplate<String,MyEvent> kafkaTemplate;
    
    public Producer() {
        super();        
    }
    
    @Autowired
    public void setAppProps(ApplicationProperties appProps) {
        this.appProps = appProps;
        this.topicName = appProps.getinput().getTopicName();
    }

    public void sendMessage(String key,MyEvent ce) {
        ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName,key,ce); 
        
    }
}

解决方法

这是为什么?是不是因为生产者中的缓冲是多线程的,每个同时生产到主题?

默认情况下,生产者最多允许向代理发送 5 个并行的动态请求,因此如果某些请求失败并重试,请求顺序可能会改变。

为避免这种重新排序问题,您可以设置 max.in.flight.requests.per.connection = 1(可能会影响性能)或设置 enable.idempotence = true

顺便说一句:您没有说您的主题是有单个分区还是多个分区,以及您的消息是否有键?如果您的主题有多个分区,并且您的消息被发送到不同的分区,则无论如何都无法保证读取顺序,因为偏移量排序仅在一个分区内得到保证。

我认为自定义时间戳提取器将有助于在使用消息时对消息进行排序。但他们没有。或者我对时间戳提取器的理解是错误的。

时间戳提取器只提取时间戳。 Kafka Streams 不会对任何消息重新排序,而是始终按偏移顺序处理消息。

如果不是,那么时间戳提取器的具体用途是什么?只是为了将时间戳与事件相关联?

正确。

我从 SO here 得到了一个解决方案,将所有事件从 tA 流式传输到另一个中间主题(比如 tA'),该主题将使用时间戳提取器传输到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。

不,它不会进行任何重新排序。另一个 SO 问题即将更改时间戳,但是如果您按 a、b、c 的顺序读取消息,结果将按 a、b、c 的顺序写入(只是使用不同的时间戳,但应保留偏移顺序)。

这个演讲解释了更多细节:https://github.com/matomo-org/matomo-log-analytics/issues

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...