新的pyserial软件包中没有setTimeout函数python

问题描述

我有两个要通过Modbus通信访问的设备(逆变器和电度表)。 逆变器的脚本在Python 3版本和最新的Pyserial软件包上都可以很好地工作,但是电能表的脚本需要旧版本的pyserial(新的pyserial模块没有setTimeout函数(已删除))。 我的问题是是否可以将旧的脚本代码编辑到电表并更改setTimeout函数?我当时正在考虑将setTimeout函数更改为超时,但是现在无法对其进行测试(我还没有能量计)。 首先,我还要指出,我是Python的初学者,会寻求经验丰富的人的帮助。

代码形式脚本文件

# -*- coding: utf-8 -*-

import time
import serial
import mbusbase
from mbusbase import get16,get8,bytestr,errcode


class mbclient(object):

    transportOpen = False
    TOUTMSG = 1.0
    TOUTCHAR = 2

    def __init__(self):
        pass

    def getReply(self):
        bstr = self.transportRead(3,self.TOUTMSG)
        if bstr != '' and bstr != None:
            if len(bstr) < 3:
                return None
            bajty = bytearray(bstr)

            # print bytestr(bajty)
            # find expected length

            bytescnt = 0
            if bajty[1] == 1 or bajty[1] == 2:
                bytescnt = int(bajty[2])
            elif bajty[1] == 3 or bajty[1] == 4:
                bytescnt = int(bajty[2])
            elif bajty[1] == 5:
                bytescnt = 4
            elif bajty[1] == 5 or bajty[1] == 6 or bajty[1] == 15 \
                or bajty[1] == 16:
                bytescnt = 4
            elif bajty[1] > 127:
                bytescnt = 1
            bytescnt += 2

            # print 'Expected bytes : ' + str(bytescnt)

            bstr = self.transportRead(bytescnt,self.TOUTCHAR)

            # print 'bytes : ',len(bstr)

            if bstr != '':
                if len(bstr) < bytescnt:
                    return None
                bajtydata = bytearray(bstr)

                # print bytestr(bajtydata)

                apdu = bajty + bajtydata
                rtu = mbusbase.receivedRtu(apdu)
                rtu.clientParse()
                return rtu
        else:
            return None

    def transportOpen(self):
        pass

    def transportClose(self):
        return None

    def transportSend(self,rawbytes):
        return None

    def transportRead(self,count,timeout=None):
        return None


     # ---------------------------------------------------------
     # error resonse

class mbclientserial(mbclient):

    portdev = None

    def transportOpen(
        self,device,brate=9600,stopbit=1,timeout=0.9,):
        self.mainTout = timeout
        self.charTout = 12 * 1 / brate
        self.portdev = None
        try:
            self.portdev = serial.Serial(device,timeout=timeout,baudrate=9600,parity=serial.PARITY_EVEN,stopbits=serial.STOPBITS_ONE)
        except:
            self.portdev = None
            print 'ERROR: cant open serial port: ' + str(device)
            return None
        self.portdev.flushinput()
        self.portdev.setTimeout(timeout)
        self.portdev.baudrate = brate
        self.portdev.parity = serial.PARITY_EVEN
        self.portdev.stopbits = serial.STOPBITS_ONE
        self.transportOpen = True

    def transportClose(self):
        try:
            self.portdev.close()
        except:
            pass
        return None

    def transportSend(self,rawbytes):
        if not self.transportOpen:
            return None
        try:
            bb = bytearray(1)
            for b in rawbytes:
                bb[0] = b
                self.portdev.write(bb)
                time.sleep(0.005)

                        # return self.portdev.write(rawbytes)

            return
        except:
            print "ERROR: can't write serial port "
        return None

    def transportRead(self,timeout):

        if not self.transportOpen:
            return None
        try:
            if timeout:
                if timeout == self.TOUTMSG:
                    self.portdev.setTimeout(self.mainTout)
                elif timeout == self.TOUTCHAR:
                    self.portdev.setTimeout(self.charTout * count)
            return self.portdev.read(count)
        except:
            print "ERROR: can't read serial port "
        return None


if __name__ == '__main__':
    tecl = mbclientserial()
    tecl.transportOpen('/dev/ttyUSB0',brate=9600)

    # r03 = mbusbase.request01(1,0x3,20)

    r03 = mbusbase.request03(1,1,10)
    print bytestr(r03.rtu())
    tecl.transportSend(r03.rtu())
    reply = tecl.getReply()
    if reply:
        if reply.fail:
            print 'dupa' + str(reply.fail)
        else:
            print reply.vals

    # time.sleep(0.5)

    pass
            ```

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)