问题描述
我正在运行一个使用 pykafka
生成消息的简单场景,但是当我同时启用 rdkafka
和消息大小相对较大时出现异常。
with topic.get_producer(use_rdkafka=True,delivery_reports=True,block_on_queue_full=True) as p:
for i in range(40000):
print(i)
p._protocol_version = 1
p.produce(str.encode(msg_str),partition_key=str(f'{i}').encode(),timestamp=datetime.datetime.Now())
while True:
try:
msg,exc = p.get_delivery_report(block=False,timeout=.1)
if exc is not None:
print('Failed to deliver msg {}: {}'.format(msg.partition_key,repr(exc)))
else:
print(f'Successfully delivered msg {msg.partition_key}')
except queue.Empty:
break
当 msg_str
只是一个短字符串时,一切正常。但是当长度大约为 30000 个字符时,生产在大约 38847 次迭代后停止,一段时间后我得到一个异常 pykafka.exceptions.ProducerQueueFullError
。
看起来我添加的 block_on_queue_full=True
标志是明确的 ignored
if rdkafka and use_rdkafka:
Cls = rdkafka.RdKafkaProducer
kwargs.pop('block_on_queue_full',None)
在我可以继续生产之前启用等待的正确顺序是什么?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)