问题描述
我有两个要通过Modbus通信访问的设备(逆变器和电度表)。 逆变器的脚本在Python 3版本和最新的Pyserial软件包上都可以很好地工作,但是电能表的脚本需要旧版本的pyserial(新的pyserial模块没有setTimeout函数(已删除))。 我的问题是是否可以将旧的脚本代码编辑到电表并更改setTimeout函数?我当时正在考虑将setTimeout函数更改为超时,但是现在无法对其进行测试(我还没有能量计)。 首先,我还要指出,我是Python的初学者,会寻求经验丰富的人的帮助。
# -*- coding: utf-8 -*-
import time
import serial
import mbusbase
from mbusbase import get16,get8,bytestr,errcode
class mbclient(object):
transportOpen = False
TOUTMSG = 1.0
TOUTCHAR = 2
def __init__(self):
pass
def getReply(self):
bstr = self.transportRead(3,self.TOUTMSG)
if bstr != '' and bstr != None:
if len(bstr) < 3:
return None
bajty = bytearray(bstr)
# print bytestr(bajty)
# find expected length
bytescnt = 0
if bajty[1] == 1 or bajty[1] == 2:
bytescnt = int(bajty[2])
elif bajty[1] == 3 or bajty[1] == 4:
bytescnt = int(bajty[2])
elif bajty[1] == 5:
bytescnt = 4
elif bajty[1] == 5 or bajty[1] == 6 or bajty[1] == 15 \
or bajty[1] == 16:
bytescnt = 4
elif bajty[1] > 127:
bytescnt = 1
bytescnt += 2
# print 'Expected bytes : ' + str(bytescnt)
bstr = self.transportRead(bytescnt,self.TOUTCHAR)
# print 'bytes : ',len(bstr)
if bstr != '':
if len(bstr) < bytescnt:
return None
bajtydata = bytearray(bstr)
# print bytestr(bajtydata)
apdu = bajty + bajtydata
rtu = mbusbase.receivedRtu(apdu)
rtu.clientParse()
return rtu
else:
return None
def transportOpen(self):
pass
def transportClose(self):
return None
def transportSend(self,rawbytes):
return None
def transportRead(self,count,timeout=None):
return None
# ---------------------------------------------------------
# error resonse
class mbclientserial(mbclient):
portdev = None
def transportOpen(
self,device,brate=9600,stopbit=1,timeout=0.9,):
self.mainTout = timeout
self.charTout = 12 * 1 / brate
self.portdev = None
try:
self.portdev = serial.Serial(device,timeout=timeout,baudrate=9600,parity=serial.PARITY_EVEN,stopbits=serial.STOPBITS_ONE)
except:
self.portdev = None
print 'ERROR: cant open serial port: ' + str(device)
return None
self.portdev.flushinput()
self.portdev.setTimeout(timeout)
self.portdev.baudrate = brate
self.portdev.parity = serial.PARITY_EVEN
self.portdev.stopbits = serial.STOPBITS_ONE
self.transportOpen = True
def transportClose(self):
try:
self.portdev.close()
except:
pass
return None
def transportSend(self,rawbytes):
if not self.transportOpen:
return None
try:
bb = bytearray(1)
for b in rawbytes:
bb[0] = b
self.portdev.write(bb)
time.sleep(0.005)
# return self.portdev.write(rawbytes)
return
except:
print "ERROR: can't write serial port "
return None
def transportRead(self,timeout):
if not self.transportOpen:
return None
try:
if timeout:
if timeout == self.TOUTMSG:
self.portdev.setTimeout(self.mainTout)
elif timeout == self.TOUTCHAR:
self.portdev.setTimeout(self.charTout * count)
return self.portdev.read(count)
except:
print "ERROR: can't read serial port "
return None
if __name__ == '__main__':
tecl = mbclientserial()
tecl.transportOpen('/dev/ttyUSB0',brate=9600)
# r03 = mbusbase.request01(1,0x3,20)
r03 = mbusbase.request03(1,1,10)
print bytestr(r03.rtu())
tecl.transportSend(r03.rtu())
reply = tecl.getReply()
if reply:
if reply.fail:
print 'dupa' + str(reply.fail)
else:
print reply.vals
# time.sleep(0.5)
pass
```
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)