问题描述
我正在尝试从最早的偏移量读取 kafka 主题,然后通过 python 脚本对某些记录进行墓碑处理。由于消息数量巨大(百万+),我想利用多处理使脚本在使用消息的同时更快。这是脚本的一个片段:
from kafka import KafkaConsumer
def cleanup_kafka_topic(self,env):
# Declarations
consumer = KafkaConsumer(<topic_name>,group_id=<some_group>),bootstrap_servers=[<kafka_host:kafka_port>],auto_offset_reset='earliest',enable_auto_commit=True)
# Clean-up logic
for msg in consumer:
# Do something with the msg
我正在使用 kafka-python。
解决方法
Kafka 消费者不是线程安全的(请参阅此处的线程安全部分:https://pypi.org/project/kafka-python/)。加快速度的方法是在您的主题上有多个分区并扩大消费者的数量(都具有 sae 消费者组标识符)。如果您有 N 个分区,则最多可以有 N 个消费者(每个分区最多可以有 1 个消费者)。 Kafka 将在您的消费者上升或下降时负责分配和重新分配分区,以便您可以按需扩展(例如,通过观察分区的滞后)。请注意,根据文档,这需要使用较新的 (0.9+) kafka 代理。