当队列已满时,带有 rdkafka 的 Pykafka 不会阻塞生产者

问题描述

我正在运行一个使用 pykafka 生成消息的简单场景,但是当我同时启用 rdkafka 消息大小相对较大时出现异常。

我的代码与示例非常相似here

with topic.get_producer(use_rdkafka=True,delivery_reports=True,block_on_queue_full=True) as p:
    for i in range(40000):
        print(i)
        p._protocol_version = 1
        p.produce(str.encode(msg_str),partition_key=str(f'{i}').encode(),timestamp=datetime.datetime.Now())
        while True:
            try:
                msg,exc = p.get_delivery_report(block=False,timeout=.1)
                if exc is not None:
                    print('Failed to deliver msg {}: {}'.format(msg.partition_key,repr(exc)))
                else:
                    print(f'Successfully delivered msg {msg.partition_key}')
            except queue.Empty:
                break

msg_str 只是一个短字符串时,一切正常。但是当长度大约为 30000 个字符时,生产在大约 38847 次迭代后停止,一段时间后我得到一个异常 pykafka.exceptions.ProducerQueueFullError

看起来我添加block_on_queue_full=True 标志是明确的 ignored

    if rdkafka and use_rdkafka:
        Cls = rdkafka.RdKafkaProducer
        kwargs.pop('block_on_queue_full',None)

在我可以继续生产之前启用等待的正确顺序是什么?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)