有没有办法在 python 中使用多处理从 kafka 主题中读取大量消息?

问题描述

我正在尝试从最早的偏移量读取 kafka 主题,然后通过 python 脚本对某些记录进行墓碑处理。由于消息数量巨大(百万+),我想利用多处理使脚本在使用消息的同时更快。这是脚本的一个片段:

    from kafka import KafkaConsumer

    def cleanup_kafka_topic(self,env):
    # Declarations
    consumer = KafkaConsumer(<topic_name>,group_id=<some_group>),bootstrap_servers=[<kafka_host:kafka_port>],auto_offset_reset='earliest',enable_auto_commit=True)
    # Clean-up logic
    for msg in consumer:
        # Do something with the msg

我正在使用 kafka-python

解决方法

Kafka 消费者不是线程安全的(请参阅此处的线程安全部分:https://pypi.org/project/kafka-python/)。加快速度的方法是在您的主题上有多个分区并扩大消费者的数量(都具有 sae 消费者组标识符)。如果您有 N 个分区,则最多可以有 N 个消费者(每个分区最多可以有 1 个消费者)。 Kafka 将在您的消费者上升或下降时负责分配和重新分配分区,以便您可以按需扩展(例如,通过观察分区的滞后)。请注意,根据文档,这需要使用较新的 (0.9+) kafka 代理。