问题描述
使用ConsumerRebalanceListener
启动aiokafka使用者时,我们对从kafka接收到的这批消息(getmany())进行了处理。我们还将这个处理过程添加到on_partitions_revoked
中,以确保在发生重新平衡时这些处理将完成。
当发生重新平衡并且同时处理过程已经发生时,on_partitions_revoked
将再次调用处理过程并且可以处理两次消息,为避免这种情况,我们在此过程开始时添加了锁定。
我实际上不确定在这种情况下我们是否真的需要锁,因此如果这里的人可以对此提供建议,将不胜感激。当我们在这种情况下使用aiokafka时,我想这可能是Kafka的一个普遍问题。
class KafkaWrapper(ConsumerRebalanceListener):
def __init__(
self,consumer_bootstrap_servers: List[str],consumer_topic: str,consumer_group_id: str,):
self.records_lock = asyncio.Lock()
self.kafka_consumer = AIOKafkaConsumer(
bootstrap_servers=consumer_bootstrap_servers,group_id=consumer_group_id,)
self.kafka_consumer.subscribe(topics=[consumer_topic],listener=self)
async def process_records(self):
async with self.records_lock: # Is this lock really required??
# processing message
async def on_partitions_revoked(self,_revoked):
await self.process_records()
async def on_partitions_assigned(self,_assigned):
pass
async def _watch_kafka(self):
await self.kafka_consumer.start()
while not self.should_stop.is_set():
messages = await self.kafka_consumer.getmany()
if len(local_records_by_topic_partition) > 0:
async with self.records_lock:
self.messages = messages
await self.process_records()
解决方法
当然应该使用 asyncio.lock
如果您不使用 lock
,则可能会在过程中间发生撤销。
这可能会对您的实施产生意想不到的影响。
更糟糕的是,如果进程中有commit方法,则可能出现以下场景:
进程启动 -> 撤销启动 -> 提交 -> 错误!
发生错误是因为消费者失去了对分区的权限。
因此,通过使用 lock
,撤销应该在进程完全完成后发生。