Kafka:Sarama、幂等性和 transactional.id

问题描述

Shopify/sarama 是否提供类似于 JVM API 中的 transactional.id 的选项?

该库支持幂等(Config.Producer.Idemponent,类似于enable.idempotence),但我不明白没有transactional.id如何使用它。

如果我错了,请纠正我,Sarama 中缺少有关这些选项的文档。但是根据 JVM 文档,没有标识符的幂等性将受到单个生产者会话的限制。换句话说,当生产者失败并重新启动时,我们将失去保证。

我在源代码和一些测试(for example)中找到了相关属性,但不明白如何在外部使用它们。

解决方法

Shopify/sarama 为 Kafka 提供 Exactly Once (Idempotency) 和启用幂等的生产者。但是对于下面的配置设置需要在那里。

来自Shopify/sarama/config.go

    if c.Producer.Idempotent {
        if !c.Version.IsAtLeast(V0_11_0_0) {
            return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
        }
        if c.Producer.Retry.Max == 0 {
            return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
        }
        if c.Producer.RequiredAcks != WaitForAll {
            return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
        }
        if c.Net.MaxOpenRequests > 1 {
            return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
        }
    }

Shopify/sarama 中,他们是如何做到这一点的,producerEpochAsyncProducer 中有一个 transactionManager ID。您可以在 Shopify/sarama/async_producer.go 中引用该文件。此 Id 使用生产者初始化进行初始化,并在成功生成每条消息时递增。读取 bumpEpoch() 函数以在 async_producer.go 文件中查看。

这是该生产者与代理会话的序列 ID,它与每条消息一起发送。消息发布成功时递增。

阅读this example。它描述了幂等性的工作原理。

您对生产者会话事实是正确的。这正是曾经为单个生产者会话所承诺的。在序列失败后立即重新启动生产者时,可能会出现重复。

当生产者重新启动时,新的 PID 被分配。因此,幂等性仅适用于单个生产者会话。即使生产者在失败时重试请求,每条消息也只会在日志中保留一次。根据生产者获取数据的来源,仍然可能存在重复项。 Kafka 不会处理生产者收到的重复数据。因此,在某些情况下,您可能需要额外的重复数据删除系统。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...