Thespian参与者中的无限异步循环 我想在Thespian演员内进行无限循环用于某些连续过程

问题描述

我想在Thespian演员内进行无限循环(用于某些连续过程)。

我试图用receiveMessage_Run()方法开始循环(请参见下面的代码),但是我无法结束循环。该演员被阻止接收新消息。

有人可以帮助我完成此任务吗? 如何运行无限循环(似乎是异步的)并保存对其他消息作出反应的可能性以及停止循环和参与者的可能性?

我的测试代码

from thespian.actors import ActorExitRequest,ActorSystem,Actor,ActorTypedispatcher
import time
import random

# Message 'Run'
class Run():
    pass

# Message 'Stop'
class Stop():
    pass


class Actor(ActorTypedispatcher):
    def __init__(self):
        self.stop = False

    # It seems,it must be asynchronous
    def loop(self):
        while not self.stop:
            time.sleep(1)
            print(random.random())

    def receiveMsg_Run(self,data,sender):
        self.loop()
        self.send(sender,'Finished the loop.')

    def receiveMsg_Stop(self,msg,sender):
        self.stop = True
        self.send(self.myAddress,ActorExitRequest)
        print('*' * 1000)

    def receiveMsg_ActorExitRequest(self,sender):
        self.stop = True
        time.sleep(1)


if __name__ == '__main__':
    asys = ActorSystem('multiprocQueueBase')

    loop = asys.createActor(Actor)

    resp = asys.ask(loop,Run())
    print(resp)
    time.sleep(4)
    asys.tell(loop,Stop())

    ActorSystem().shutdown()

解决方法

Thespian实际上会无限循环地运行每个actor,直到收到ActorExitRequest()或Actor系统关闭为止。但是,每个actor只是一个执行线程,因此,您发现,如果actor的receiveMessage()方法永不退出,Thespian将无法运行以处理其他传入请求(或完成两个异步出站发送)。单执行线程的优点是Actor的上下文简单明了:它不需要对其成员变量进行互斥保护,也不需要考虑数据竞争,死锁或值损坏。

class Actor(ActorTypeDispatcher):
    def __init__(self):
        self.stop = False
        self.requestor = None

    def run_single(self):
            print(random.random())

    def receiveMsg_Run(self,data,sender):
        self.requestor = sender
        self.run_single()
        self.wakeupAfter(datetime.timedelta(seconds=1))

    def receiveMsg_WakeupMessage(self,msg,sender):
        if self.stop:
            self.send(self.requestor,'Finished the loop.')
        else:
            self.run_single()
            self.wakeupAfter(datetime.timedelta(seconds=1))

    def receiveMsg_Stop(self,sender):
        self.stop = True
        # Note that this next line will probably remove the need for self.stop entirely:
        # receiveMsg_WakeupMessage will not get a chance to run when self.stop is True
        self.send(self.myAddress,ActorExitRequest)
        print('*' * 1000)

    def receiveMsg_ActorExitRequest(self,sender):
        self.stop = True
        time.sleep(1)

有关更多详细信息,请参见https://thespianpy.com/doc/using.html#hH-9cc1acd4-7f35-44ac-94fc-0326dc87a7a5

如果您的run_once确实确实长时间阻塞了,并且无法通过上述方法定期进行调度,则可以考虑使用单独的线程;不幸的是,这是不同处理和调度概念(Actor模型与线程)的混合,但是有时候当Actor必须运行用该替代模型编写的代码时,有时是必需的。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...