pubSubSource:两次接收相同的消息 说明如何复制

问题描述

说明

  • 我在 Kafka Connect 分布式模式下有一个 pubSubSource 连接器,它只是从 PubSub 订阅中读取并写入 Kafka 主题。问题是,即使我将一条消息发布到 GCP PubSub,我也会在我的 Kafka 主题中收到此消息两次。

如何复制

  • 部署 Kafka 和 Kafka 连接

  • 使用以下 pubSubSource 配置创建连接器:

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
      "name": "pubSubSource","config": {
        "connector.class":"com.google.pubsub.kafka.source.CloudPubSubSourceConnector","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter","tasks.max":"1","cps.subscription":"pubsub-test-sub","kafka.topic":"kafka-sub-topic","cps.project":"test-project123","gcp.credentials.file.path":"/tmp/gcp-creds/account-key.json"
      }
    }'
    
  • 以下是 Kafka 连接配置:

    "plugin.path": "/usr/share/java,/usr/share/confluent-hub-components"
    "key.converter": "org.apache.kafka.connect.json.JsonConverter"
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    "key.converter.schemas.enable": "false"
    "value.converter.schemas.enable": "false"
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
    "config.storage.replication.factor": "1"
    "offset.storage.replication.factor": "1"
    "status.storage.replication.factor": "1"
    
  • 使用以下命令将消息发布到 PubSub 主题

    gcloud pubsub topics publish test-topic --message='{"someKey":"someValue"}'
    
  • 从目标 Kafka 主题读取消息:

    /usr/bin/kafka-console-consumer --bootstrap-server xx.xxx.xxx.xx:9092 --topic kafka-topic --from-beginning
    
    # Output
    {"someKey":"someValue"}
    {"someKey":"someValue"}
    

为什么会这样,是不是我做错了什么?

解决方法

我在 https://cloud.google.com/pubsub/docs/faq 找到了以下信息,您似乎遇到了同样的问题。您可以尝试生成大消息,看看结果是否相同?

来自链接的详细信息:

为什么重复消息太多? Pub/Sub 保证至少一次消息传递,这意味着偶尔会出现重复。但是,高重复率可能表明客户端未在配置的 ack_deadline_seconds 内确认消息,并且 Pub/Sub 正在重试消息传递。这可以在请求订阅的监控指标 pubsub.googleapis.com/subscription/pull_ack_message_operation_count 和推送订阅的 pubsub.googleapis.com/subscription/push_request_count 中观察到。在 /response_code 中查找提升的 expired 或 webhook_timeout 值。如果有很多小消息,这种情况尤其可能发生,因为 Pub/Sub 可能会在内部对消息进行批处理,并且部分确认的批处理将被完全重新传送。

另一种可能是订阅者没有确认某些消息,因为处理这些特定消息的代码路径失败,并且从未进行 Acknowledge 调用;或者推送端点永远不会响应或响应错误。

如何检测重复消息? Pub/Sub 为每条消息分配一个唯一的 message_id,可用于检测订阅者收到的重复消息。但是,这不会允许您检测由对同一数据的多个发布请求产生的重复项。检测这些将需要发布者提供唯一的消息标识符。如需进一步讨论,请参阅 Pub/Sub I/O。