Twisted 之 reactor

在 Twisted中,有一个全局用于实现事件循环的对象为reactor。

反应器具体的工作包括:定时任务、线程、建立网络连接、监听连接。

1、定时器简单实例

例子如下:
#! /usr/bin/using_reactor.py
# Filename:using_reactor.py
from twisted.internet import reactor
import time
def printTime():
    print 'Current time is',time.strftime("%H:%M:%S")

def stopReactor():
    print "Stopping reactor"
    reactor.stop()

reactor.callLater(1,printTime)
reactor.callLater(2,printTime)
reactor.callLater(3,printTime)
reactor.callLater(4,printTime)
reactor.callLater(5,stopReactor)

print 'Running the reactor ...'
reactor.run()
print 'Reactor stopped.'

运行结果:
python using_reactor.py
Running the reactor ...
Current time is 17:44:58
Current time is 17:44:59
Current time is 17:45:00
Current time is 17:45:01
Stopping reactor
Reactor stopped.


运行过程:
使用reactor.callLater函数定时执行函数。reactor.callLater函数包含两个必须参数,等待的秒数,和需要调用的函数。

反应器在被告知停止之前会一直运行,直到reactor.stop()调用。
在实际应用中,reactor.callLater是常用于超时处理和定时事件。

2、协议与工厂

例子如下:
#! /usr/bin/using_tcp.py
# Filename:using_tcp.py
from twisted.internet import reactor,protocol

class QuickDisconnectedProtocol(protocol.Protocol):
    def connectionMade(self):
        print "Connected to %s." %self.transport.getPeer().host
        self.transport.loseConnection()

class BasicClientFactory(protocol.ClientFactory):
    protocol = QuickDisconnectedProtocol
    def clientConnectionLost(self,connector,reason):
        print 'Lost connection: %s' %reason.getErrorMessage()
        reactor.stop()

    def clientConnectionFailed(self,reason):
        print 'Connection failed: %s' %reason.getErrorMessage()
        reactor.stop()

reactor.connectTCP('www.google.com',80,BasicClientFactory())
reactor.run()

运行结果:

python using_tcp.py
Connected to 74.125.71.99.
Lost connection: Connection was closed cleanly.
运行过程:
这里有两个主要的类用于作为客户端工作,ClientFactory和Protocol。

这些类被设计成处理连接中所有可能运到的事件:成功建立连接、连接失败、连接断开、数据传送等等。

(1)协议

QuickDisconnectProtocol为自定义的Protocol,继承自protocol.Protocol。它重载了一个方法connectMade。这个方法在连接成功时运行。在reactor刚刚成功建立了连接,ClientFactory创建了QuickDisconnectProtocol的实例时。Protocol对象有一个属性叫做transport,包含了当前活动连接对象。

(2)工厂

ClientFactory的工作是管理连接事件,并且创建Protocol对象处理每一个成功的连接。一旦连接建立,Protocol对象就接管下面的工作了,包括收发数据和决定是否关闭连接。

BasicClientFactory是继承自protocol.ClientFactory的类。它首先设置了类变量protocol为QuickDisconnectProtocol。这个类的实例被创建用于管理成功的连接。

BasicClientFactory 重载了ClientFactory的两个方法,clientConnectionLost和clientConnectionFailed:

1)clientConnectionFailed在反应器建立连接失败时被调用

2)clientConnectionLost在建立的连接被关闭或断开时调用

通知反应器建立TCP连接,如调用reactor.connectTCP: reactor.connectTCP(‘www.google.com’,BasicClientFactory()),通知反应器建立一个TCP连接到服务器www.google.com的80端口,通过BasicClientFactory来管理连接。


2、具体应用

(1)被动连接工厂

server类

class server
    def __init__(self,config):
        self.nodeId = config.nodeId
        self.channels = {} #channels字典,nodeId 对应channel对象
    def startRun(self): 
	#这里启动监听连接
        reactor.listenTCP(svrNode.port,ServerConnectionFactory(self,self.nodeId))
	for nodeId in [1,2,3]:#这里应该是客户端节点列表(需要自己实现)
		reactor.listenTCP(svrNode.port,nodeId))

#服务器连接工厂,用来生成制造出协议类对象
class ServerConnectionFactory(protocol.ServerFactory):
    protocol = ServerConnection
    def __init__(self,host,nodeId):
        self.host = host
        self.nodeId = nodeId

    def buildProtocol(self,addr):
        p = protocol.ServerFactory.buildProtocol(self,addr)
        p.host = self.host
        p.serverId = self.nodeId
        p.clientId = None
        return p


服务器连接类

class ServerConnection(protocol.Protocol):
     #处理连接制造的初始化,可以在这里设置一个空的chanel,chanel会包含transfer对象,用来处理数据发送
     def connectionMade(self):
         ....
     #处理连接断开后,销毁chanel
     def connectionLost(self,reason):
         ...
         #这里处理数据接受,并把接受的数据写到channel的缓存,如果没有建立channel,就先建立
         #会在第一次接受数据时设置channel对象
         #所有的channel存在于server的channels字典
         #接受的数据需要验证验证码,否则就主动断开连接,使用self.transport.loseConnection(),可以自定义
         #一个握手信息,在第一次接受数据时验证,以后就不用验证了。
     def dataReceived(self,data):
         ...
     #在被底层删除时,需要添加的自定义处理
     def __del__(self):
	 ...

会话类
class Channel(object):
     #在serverConnextion(服务器接受连接协议)里面创建Channel时就要传入一个transport对象
     #(protocol.Protocol的成员,来自于from twisted.internet import protocol,在twisted里面实现的协议基础类)
     # 设置缓冲区成员(初始是空字符串),设置server作为Channel的监听者,打包好的缓冲数据(反序列化后),就交由
     #server的频道回调处理函数onChannelEvent,该函数会发送到具体服务节点,根据服务id就发送给具体的服务实例或者具体的节点id
     #发送给具体chanel对应的远程服务(建立连接的客户端或服务端)
    def __init__(self,transport):
	...

(2)主动连接工厂

主动连接客户端的也类似:

客户端连接协议

class ClientConnection(protocol.Protocol):
	#简历连接,发送握手信息(对端开始时要检验)
	def connectionMade(self):
	    ...
	#连接断开,销毁channel
	def connectionLost(self,reason):
	    ...
	def dataReceived(self,data): #接受数据,第一次时建立channel,数据保存在channel的缓冲区
	    ...
class ClientConnectionFactory(protocol.ReconnectingClientFactory):
	#初始化设置主机(为server),和节点id,设置重新连接延迟(5s)
	def __init__(self,nodeId):
	    ...
	#返回客户端连接协议实例
	def buildProtocol(self,addr):
	    ...
	#连接断开,使用protocol.ReconnectingClientFactory.clientConnectionLost尝试重新连接
	def clientConnectionLost(self,reason):
	    ...
	#连接断开,使用protocol.ReconnectingClientFactory.clientConnectionFailed尝试重新连接
	def clientConnectionFailed(self,reason):
	    ...

相关文章

react 中的高阶组件主要是对于 hooks 之前的类组件来说的,如...
我们上一节了解了组件的更新机制,但是只是停留在表层上,例...
我们上一节了解了 react 的虚拟 dom 的格式,如何把虚拟 dom...
react 本身提供了克隆组件的方法,但是平时开发中可能很少使...
mobx 是一个简单可扩展的状态管理库,中文官网链接。小编在接...
我们在平常的开发中不可避免的会有很多列表渲染逻辑,在 pc ...