问题描述
我正在尝试使用用于python的unittest包编写一个简单的测试,该测试仅检测是否存在代理连接。尽管成功建立了代理连接,但它似乎还是失败了,我90%的肯定是语法方面的问题-特别是has_connected布尔变量的定义。
import paho.mqtt.client as mqtt
import time
class TestbrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "10.0.2.4"
self.port = 1883
self.has_connected = False
def on_connect(client,userdata,flags,rc): #connect function
if rc==0:
self.has_connected = True
def test_connection(self): #test to check connection to broker
self.client.connect(self.broker,self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)
if __name__ == '__main__':
unittest.main()
任何帮助将不胜感激:)
解决方法
我复制了您的代码示例,并使用了paho.mqtt客户端提供的示例进行连接
client.connect("mqtt.eclipse.org",1883,60)
我认为您的问题可能在您的on_connect函数上,您正在引用self.has_connected,但是您没有将self传递给该函数。
这对我来说很有效,请让我知道将self添加到on_connect中是否可以解决您遇到的问题!
class TestBrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "mqtt.eclipse.org"
self.port = 1883
self.has_connected = False
def on_connect(self,client,userdata,flags,rc): # connect function
if rc == 0:
self.has_connected = True
def test_connection(self): # test to check connection to broker
self.client.connect(self.broker,self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)