问题描述
我想问一下APScheduler中实现的同步机制。文档中没有提及。以下代码似乎工作正常,但我想确保对共享变量的并发访问不会导致竞争条件。
import time
import numpy as np
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
class Test(object):
x = None
def __init__(self):
self.x = 0
def step_up(self):
self.x += 1
ts = datetime.Now().replace(microsecond=0)
print(f'Step up {ts}: value of x={self.x}')
def step_heavy(self):
self.x += -1
ts = datetime.Now().replace(microsecond=0)
print(f'Heavy 1 {ts}: value of x={self.x}')
np.random.random(size=(20000,20000))
self.x += -1
ts = datetime.Now().replace(microsecond=0)
print(f'Heavy 2 {ts}: value of x={self.x}')
np.random.random(size=(20000,50000))
self.x += -1
ts = datetime.Now().replace(microsecond=0)
print(f'Heavy 3 {ts}: value of x={self.x}')
def update_setpoint(self):
pass
test = test()
scheduler.add_job(test.step_up,'interval',seconds=1)
scheduler.add_job(test.step_heavy,seconds=15)
scheduler.start()
try:
while True:
time.sleep(2)
except (KeyboardInterrupt,SystemExit):
scheduler.shutdown()
解决方法
您现在拥有的可能会导致竞争条件。由于在幕后 BackgroundScheduler
使用线程,您将需要一个原语,将对该属性的访问限制为一次一个线程。
from ctypes import c_int
from multiprocessing import Value
class Test(object):
def __init__(self):
self._x = Value(c_int,0)
@property
def x(self):
return self._x.value
@x.setter
def x(self,value):
self._x.value = value
@x.deleter
def x(self):
del self._x
# rest of the code remains the same
...
这将防止不同线程之间的竞争条件,因为当线程尝试更新值时,属性将被锁定。