如何在Tornado中编写非阻塞,分块的RequestHandler

问题描述

这里有两个简单的RequestHandlers

class AsyncHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        while True:
            future = Future()
            global_futures.add(future)
            s = yield future
            self.write(s)
            self.flush()


class AsyncHandler2(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for f in global_futures:
            f.set_result(str(dt.Now()))
        global_futures.clear()
        self.write("OK")

一个订阅”流,第二个向所有订阅者传递消息。

问题是我的订阅者不能超过一堆(在我的情况下为5-6)。一旦我订阅了超出允许的数量,对第二种方法的下一个请求就会立即挂起。

我认为这是由于第一个处理程序未正确异步而发生的。那是因为我正在使用全局对象来存储订户列表吗?

我如何才能同时打开更多流请求,逻辑上的限制是什么?

解决方法

问题是global_futures在您对其进行迭代时被修改:AsyncHandler.get唤醒时,它从一个yield运行到下一个,这意味着它创建了下一个并在控制权返回到AsyncHandler2之前将其添加到集合中。这是未定义的,其行为取决于迭代器在集合中的位置:有时将新的Future插入在迭代器的“后面”,并且一切都很好,有时将其插入在迭代器的“前面”,并且将唤醒同一消费者处理程序第二次播放(并插入其自身的第三份副本,该副本可能在前面或后面...)。当您只有几个消费者时,您经常会碰到“幕后花絮”,以至于一切正常,但是如果消费者太多,那么就永远不可能完成。

解决方案是在迭代之前复制global_futures而不是在最后将其清除:

@gen.coroutine
def get(self);
    fs = list(global_futures)
    global_futures.clear()
    for f in fs:
        f.set_result(str(dt.now()))
    self.write("OK")

请注意,我认为这只是Tornado 4.x和更早版本中的问题。在Tornado中进行了5处更改,以使set_result不再立即调用等待的处理程序,因此不再进行并发修改。