问题描述
我想在Thespian演员内进行无限循环(用于某些连续过程)。
我试图用receiveMessage_Run()
方法开始循环(请参见下面的代码),但是我无法结束循环。该演员被阻止接收新消息。
有人可以帮助我完成此任务吗? 如何运行无限循环(似乎是异步的)并保存对其他消息作出反应的可能性以及停止循环和参与者的可能性?
我的测试代码:
from thespian.actors import ActorExitRequest,ActorSystem,Actor,ActorTypedispatcher
import time
import random
# Message 'Run'
class Run():
pass
# Message 'Stop'
class Stop():
pass
class Actor(ActorTypedispatcher):
def __init__(self):
self.stop = False
# It seems,it must be asynchronous
def loop(self):
while not self.stop:
time.sleep(1)
print(random.random())
def receiveMsg_Run(self,data,sender):
self.loop()
self.send(sender,'Finished the loop.')
def receiveMsg_Stop(self,msg,sender):
self.stop = True
self.send(self.myAddress,ActorExitRequest)
print('*' * 1000)
def receiveMsg_ActorExitRequest(self,sender):
self.stop = True
time.sleep(1)
if __name__ == '__main__':
asys = ActorSystem('multiprocQueueBase')
loop = asys.createActor(Actor)
resp = asys.ask(loop,Run())
print(resp)
time.sleep(4)
asys.tell(loop,Stop())
ActorSystem().shutdown()
解决方法
Thespian实际上会无限循环地运行每个actor,直到收到ActorExitRequest()
或Actor系统关闭为止。但是,每个actor只是一个执行线程,因此,您发现,如果actor的receiveMessage()
方法永不退出,Thespian将无法运行以处理其他传入请求(或完成两个异步出站发送)。单执行线程的优点是Actor的上下文简单明了:它不需要对其成员变量进行互斥保护,也不需要考虑数据竞争,死锁或值损坏。
class Actor(ActorTypeDispatcher):
def __init__(self):
self.stop = False
self.requestor = None
def run_single(self):
print(random.random())
def receiveMsg_Run(self,data,sender):
self.requestor = sender
self.run_single()
self.wakeupAfter(datetime.timedelta(seconds=1))
def receiveMsg_WakeupMessage(self,msg,sender):
if self.stop:
self.send(self.requestor,'Finished the loop.')
else:
self.run_single()
self.wakeupAfter(datetime.timedelta(seconds=1))
def receiveMsg_Stop(self,sender):
self.stop = True
# Note that this next line will probably remove the need for self.stop entirely:
# receiveMsg_WakeupMessage will not get a chance to run when self.stop is True
self.send(self.myAddress,ActorExitRequest)
print('*' * 1000)
def receiveMsg_ActorExitRequest(self,sender):
self.stop = True
time.sleep(1)
有关更多详细信息,请参见https://thespianpy.com/doc/using.html#hH-9cc1acd4-7f35-44ac-94fc-0326dc87a7a5。
如果您的run_once
确实确实长时间阻塞了,并且无法通过上述方法定期进行调度,则可以考虑使用单独的线程;不幸的是,这是不同处理和调度概念(Actor模型与线程)的混合,但是有时候当Actor必须运行用该替代模型编写的代码时,有时是必需的。