aiokafka:在考虑用户组对kafka的重新平衡时,是否需要锁定才能处理消息

问题描述

使用ConsumerRebalanceListener启动aiokafka使用者时,我们对从kafka接收到的这批消息(getmany())进行了处理。我们还将这个处理过程添加到on_partitions_revoked中,以确保在发生重新平衡时这些处理将完成。

当发生重新平衡并且同时处理过程已经发生时,on_partitions_revoked将再次调用处理过程并且可以处理两次消息,为避免这种情况,我们在此过程开始时添加了锁定。

我实际上不确定在这种情况下我们是否真的需要锁,因此如果这里的人可以对此提供建议,将不胜感激。当我们在这种情况下使用aiokafka时,我想这可能是Kafka的一个普遍问题。

class KafkaWrapper(ConsumerRebalanceListener):
    def __init__(
        self,consumer_bootstrap_servers: List[str],consumer_topic: str,consumer_group_id: str,):
        self.records_lock = asyncio.Lock()  
        
        self.kafka_consumer = AIOKafkaConsumer(
            bootstrap_servers=consumer_bootstrap_servers,group_id=consumer_group_id,)
        self.kafka_consumer.subscribe(topics=[consumer_topic],listener=self) 

    async def process_records(self):
        async with self.records_lock:  # Is this lock really required??
            # processing message

    async def on_partitions_revoked(self,_revoked):
        await self.process_records()

    async def on_partitions_assigned(self,_assigned):
        pass

    async def _watch_kafka(self):
        await self.kafka_consumer.start()

        while not self.should_stop.is_set():
            messages = await self.kafka_consumer.getmany()
            
            if len(local_records_by_topic_partition) > 0:
                async with self.records_lock:
                    self.messages = messages
                await self.process_records()

解决方法

当然应该使用 asyncio.lock

如果您不使用 lock,则可能会在过程中间发生撤销。 这可能会对您的实施产生意想不到的影响。 更糟糕的是,如果进程中有commit方法,则可能出现以下场景:

进程启动 -> 撤销启动 -> 提交 -> 错误!

发生错误是因为消费者失去了对分区的权限。

因此,通过使用 lock,撤销应该在进程完全完成后发生。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...