问题描述
我正在编写使用confluent_kafka api(内部使用librdkafka)计算kafka吞吐量的python应用程序,但是在此过程中,我观察到,即使我传递了虚拟代理名称,应用程序也不会抛出任何错误,因此缺少了东西或这是api中的错误。还有什么更好的方法来解决这个问题?
import confluent_kafka
import json
import argparse
import time
def cmd_line_parse():
parser = argparse.ArgumentParser("Arg Parser")
parser.add_argument('-j','--json_config',help='command line argument for kafka configuration',required=True)
parser.add_argument('-t','--topic_name',help='kafka topic name',required=True)
parser.add_argument('-c','--msg_count',help='messages that you want to send to kafka topic for calculating throughput',default=500000,required=False)
parser.add_argument('-s','--msg_size',help='message size(in bytes) against which you want to test',default=1024,required=False)
args = parser.parse_args()
return args
def print_throughput_stats(time_taken,msg_count,msg_size):
print("Processed {0} messsages in {1:.2f} seconds".format(msg_count,time_taken))
print("Throughput: {0:.2f} MB/s".format((msg_size*msg_count)/time_taken/(1024*1024)))
def main():
args = cmd_line_parse()
kafka_config_dict = json.loads(args.json_config)
kafka_topic_name = args.topic_name
producer = confluent_kafka.Producer(**kafka_config_dict)
msg_payload = 'K'*args.msg_size
start_time = time.time()
for i in xrange(args.msg_count):
producer.produce(kafka_topic_name,value=msg_payload)
producer.poll(0)
producer.flush()
end_time = time.time()
print_throughput_stats(end_time-start_time,args.msg_count,args.msg_size)
if __name__ == "__main__":
try:
main()
except Exception as e:
print("Exception Occured: {}".format(str(e)))
当我通过不正确的经纪人测试此应用程序时,没有错误/异常:
python check_throughput.py -t perf_test -j '{"bootstrap.servers":"dummy.com"}'
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)