Apache Beam Python SDK ReadFromKafka 不接收数据

问题描述

我正在尝试将 Kafka 主题中的数据读入 Apache Beam 的简单示例。这是相关的片段:

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092'},topics=['test'])
        | 'Print' >> beam.Map(print))

使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在本地运行在 docker 容器中,我可以使用 kafkacat 从主机(容器外)到发布和订阅消息。所以,我想这方面没有问题。

Beam 似乎能够连接到 Kafka 并收到新消息的通知,因为我在从 kafkacat 发布数据时看到 Beam 日志中的偏移量变化:

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for partition test-0 to offset 29."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

这就是我使用 kafkacat 发布数据的方式:

$ kafkacat -P -b localhost:29092 -t test -K:
1:foo
1:bar

并且我可以再次使用 kafkacat 确认收到它:

$ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
Key: 1 Value: foo
Key: 1 Value: bar

但尽管如此,我还是没有看到 Beam 像我预期的那样打印出实际的消息。任何指向这里缺少的内容的指针都表示赞赏。我怀疑这可能是 Beam 管道端的解码问题,但可能不正确。

编辑(2021 年 3 月 17 日):

在 Beam Python Kafka 连接器开发人员关注此问题后,Python ReadFromKafka 未按预期运行的根本原因是便携式 Flink 运行器无法执行无界可拆分 DoFns (SDF),因为它仅支持自检查点。便携式流式 Flink 不会定期向 SDK 发出检查点请求。这就是为什么所有 Kafka 记录都在第一个 ReadFromKafka 阶段进行缓冲的原因。跟踪此问题的 Jira 是 https://issues.apache.org/jira/browse/BEAM-11991。此外,还有另一个 Jira 正在跟踪功能请求以支持功能https://issues.apache.org/jira/browse/BEAM-11998。希望这会有所帮助!

解决方法

我遇到了完全相同的问题,无法弄清楚为什么 ReadKafka 操作没有按预期工作。

这就好像我需要将值“提取”到另一个 PCollection 中...

误导性的是,所有的 examples using Python & Kafka (eg: KafkaTaxi) 看起来都很简单,而在我这边却行不通

我想知道自 last release (2.28.0) 以来是否发布了错误。将检查以前版本的 apache-beam