问题描述
我在Python脚本中有以下代码
from kafka import KafkaProducer
kafka_producer = KafkaProducer(....)
kafka_producer.send(topic,value=message)
kafka_producer.flush()
logger.info('Done!') # this message is displayed
但是,我仍然看到以下消息。消息似乎已成功发送。为什么即使调用flush()
也会显示“ 0秒超时”消息?
INFO:root:完成!
INFO:kafka.producer.kafka:以0秒超时关闭Kafka生产者。
INFO:kafka.producer.kafka:由于无法在超时0内完成挂起的请求,因此强制关闭生产者。
INFO:kafka.conn:
:正在关闭连接。
解决方法
在这种情况下,我认为错误消息不正确。相关代码-source on github
if timeout > 0:
if invoked_from_callback:
log.warning("Overriding close timeout %s secs to 0 in order to"
" prevent useless blocking due to self-join. This"
" means you have incorrectly invoked close with a"
" non-zero timeout from the producer call-back.",timeout)
else:
# Try to close gracefully.
if self._sender is not None:
self._sender.initiate_close()
self._sender.join(timeout)
if self._sender is not None and self._sender.is_alive():
log.info("Proceeding to force close the producer since pending"
" requests could not be completed within timeout %s.",timeout)
self._sender.force_close()
我对代码的解释:
如果timeout
为0
,我们将跳过优美的关闭代码,直接进入强制关闭。记录之前在_sender
上检查的唯一条件是它存在并且is_alive()。当然存在,并且它还活着,因为它从未被告知要关闭。
如果timeout
为0
,它从不检查是否可以完成任何事情。因此,在这种情况下,日志记录是不正确的。
在timeout > 0
的情况下,日志记录才有意义。 initiate_close()
和join()一样被调用,后者表示您只能稍后再检查is_alive()
才能知道加入是否成功。如果尝试join()
后它仍然存在,则将其强制关闭,并且无法在超时时间内完成请求。