问题描述
我正在尝试将 Kafka 主题中的数据读入 Apache Beam 的简单示例。这是相关的片段:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:29092'},topics=['test'])
| 'Print' >> beam.Map(print))
使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在本地运行在 docker 容器中,我可以使用 kafkacat
从主机(容器外)到发布和订阅消息。所以,我想这方面没有问题。
Beam 似乎能够连接到 Kafka 并收到新消息的通知,因为我在从 kafkacat
发布数据时看到 Beam 日志中的偏移量变化:
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for partition test-0 to offset 29."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
这就是我使用 kafkacat
发布数据的方式:
$ kafkacat -P -b localhost:29092 -t test -K:
1:foo
1:bar
并且我可以再次使用 kafkacat
确认收到它:
$ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
Key: 1 Value: foo
Key: 1 Value: bar
但尽管如此,我还是没有看到 Beam 像我预期的那样打印出实际的消息。任何指向这里缺少的内容的指针都表示赞赏。我怀疑这可能是 Beam 管道端的解码问题,但可能不正确。
编辑(2021 年 3 月 17 日):
在 Beam Python Kafka 连接器开发人员关注此问题后,Python ReadFromKafka
未按预期运行的根本原因是便携式 Flink 运行器无法执行无界可拆分 DoFns (SDF),因为它仅支持自检查点。便携式流式 Flink 不会定期向 SDK 发出检查点请求。这就是为什么所有 Kafka 记录都在第一个 ReadFromKafka
阶段进行缓冲的原因。跟踪此问题的 Jira 是 https://issues.apache.org/jira/browse/BEAM-11991。此外,还有另一个 Jira 正在跟踪功能请求以支持此功能:https://issues.apache.org/jira/browse/BEAM-11998。希望这会有所帮助!
解决方法
我遇到了完全相同的问题,无法弄清楚为什么 ReadKafka 操作没有按预期工作。
这就好像我需要将值“提取”到另一个 PCollection 中...
误导性的是,所有的 examples using Python & Kafka (eg: KafkaTaxi) 看起来都很简单,而在我这边却行不通
我想知道自 last release (2.28.0) 以来是否发布了错误。将检查以前版本的 apache-beam