问题描述
这里有两个简单的RequestHandlers
:
class AsyncHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
while True:
future = Future()
global_futures.add(future)
s = yield future
self.write(s)
self.flush()
class AsyncHandler2(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
for f in global_futures:
f.set_result(str(dt.Now()))
global_futures.clear()
self.write("OK")
问题是我的订阅者不能超过一堆(在我的情况下为5-6)。一旦我订阅了超出允许的数量,对第二种方法的下一个请求就会立即挂起。
我认为这是由于第一个处理程序未正确异步而发生的。那是因为我正在使用全局对象来存储订户列表吗?
我如何才能同时打开更多流请求,逻辑上的限制是什么?
解决方法
问题是global_futures
在您对其进行迭代时被修改:AsyncHandler.get
唤醒时,它从一个yield
运行到下一个,这意味着它创建了下一个并在控制权返回到AsyncHandler2
之前将其添加到集合中。这是未定义的,其行为取决于迭代器在集合中的位置:有时将新的Future插入在迭代器的“后面”,并且一切都很好,有时将其插入在迭代器的“前面”,并且将唤醒同一消费者处理程序第二次播放(并插入其自身的第三份副本,该副本可能在前面或后面...)。当您只有几个消费者时,您经常会碰到“幕后花絮”,以至于一切正常,但是如果消费者太多,那么就永远不可能完成。
解决方案是在迭代之前复制global_futures
而不是在最后将其清除:
@gen.coroutine
def get(self);
fs = list(global_futures)
global_futures.clear()
for f in fs:
f.set_result(str(dt.now()))
self.write("OK")
请注意,我认为这只是Tornado 4.x和更早版本中的问题。在Tornado中进行了5处更改,以使set_result
不再立即调用等待的处理程序,因此不再进行并发修改。