Tornado 之gen.engine

Tornado在2011年的2.1版本加入了gen.engine模块,该模块主要为了解决异步程序编写不够优雅的问题。力图让使用者离callback更远,这也是Tornado厉害的地方。本来内部各种事件处理,callback满天飞,可是在用户眼里,它那个class Handler(web.RequestHandler)下面get的写法和同步写法差不多嘛。和同步的写法一样,获得了更高的性能,我想这也是为什么Tornado能出彩的地方吧(本文代码基于v2.3.0)

起因

主要起因就是不够优雅的callback,假设有一个需求,我们需要向第三方api请求数据,然后返回给客户端。此时写法是这样的

class AsyncHandler(RequestHandler):
@asynchronous
    def get(self):
        http_client = AsyncHTTPClient()
        http_client.fetch("http://example.com",
                          callback=self.on_fetch)

    def on_fetch(self, response):
        do_something_with_response(response)
        self.render("template.html")

这种写法会有什么问题?假设我现在需要对返回的数据进行处理,那么我必须要写到回调函数里面。另外,如果在回调函数里面继续要使用其他的回调逻辑,那么肯定也是需要继续在回调函数里面编写,最后就形成了著名的回调地狱。明显这种方式是很让人恶心的,Tornado在这一版本上改动成了这样

class GenAsyncHandler(RequestHandler):
@asynchronous
@gen.engine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield gen.Task(http_client.fetch, "http://example.com")
        do_something_with_response(response)
        self.render("template.html")

这样就好很多了嘛,更符合人们同步编写代码的直觉。观察改变有三个地方

  • 整个函数添加了gen.engine装饰器
  • 添加了yield语句
  • http_client.fetch被gen.Task封装起来

实现

先说一下这个地方的改动。
v2.1.0版本并没有改动任何已有的代码,仅仅是添加了gen.py模块

该commitz在这里
。我觉得这算是比较厉害的地方了

回顾这里用到的知识点,毫无疑问,Tornado里面最核心的yield终于在这个版本上场了。原来http_client.fetch是需要传入一个callback回调函数进行回调的。那么被gen.Task封装之后,这个回调函数不存在了,可以认为这里是gen.Task自己内部传入了一个回调函数。而只要是函数用到了yield关键字,那么它就是一个生成器对象。生成器对象有两个基本的特性,执行get()并不会立即开始执行、执行send后遇到yield会被暂停

考虑到上面说的gen.py是一个很独立的函数,并没有改动任何已有的代码。那么gen.engine装饰器肯定对get()执行了初始化并执行了send(None)让它开始运行起来。可是遇到yield会停止。同时上面的语义是再次恢复后以前http_client.fetch传递给回调函数的值需要赋值给response变量,生成器的send方法是可以做到这一点的,这里可能并不太好理解。

现在对功能进行划分

  • gen.engine装饰器的主要作用是让生成器运行起来(调用后再执行send(None))
  • gen.Task的作用是将http_client.fetch 封装成Task对象,并传入一个回调,该回调使得send被再次调用
  • Runner 这是一个对用户无感知的类。连接gen.engine和gen.Task

基础实现代码如下

class Runner():
    def __init__(self, gen):
        self.gen = gen()
        self.yielded = self.gen.send(None)
        self.yielded.start(self)

    def result_callback(self, value):
        self.result = value
        self.run()

    def run(self):
        try:
            self.yielded = self.gen.send(self.result)
        except StopIteration:
            return
        else:
            self.yielded.start(self)

class Task():
    def __init__(self, func):
        self.func = func
    def start(self, runner):
        self.func(callback=runner.result_callback)

def engine(func):
    def inner():
        return Runner(func)
    return inner

可以看到gen.engine是一个非常简单的装饰器,将生成器传递给Runner并返回。在Runner里面它被执行send(None)。这个时候send(None)返回的是Task(http_client.fetch, “ http://example.com”)。

Task的封装也可以很简单,它仅仅存在一个start。传入runner并将http_client.fetch的回调设置为runner.result_callback

Runner呢,它负责了让生成器开始运行,并拥有result_callback函数,在callback里面,它调用了自身的run函数。在run里面再次调用send,此时send的值是http_client.fetch返回的值。如果没有触发StopIteration异常则表明再一次返回了Task对象。重复过程执行Task.start

这样它很巧妙的将callback驱动变成了yield驱动,要知道Task必定会调用callback函数,当callback被调用的时候就等同于赋值给了yield左边的变量

改进

上面的代码并没有考虑异常情况,另外可能存在这种情况

def get(self):
    http_client = AsyncHTTPClient()
    response1, response2 = yield [gen.Task(http_client.fetch, url1),
                                  gen.Task(http_client.fetch, url2)]

我们希望两个Task同时执行,而不是等一个得到结果后,再去请求第二个结果。因此需要稍微复杂一些。我们给每一个Task标记一个独立的key,并修改Task的callback部分,让回调的时候知道属于哪一个Task对象。于此同时Task多了两个方法 get_result
is_ready

方法。在首次得到yielded对象后。如果判断是list对象,那么对该list再次进行封装得到Multi对象,它的is_ready会检查多个Task是不是都得到结果。在被执行回调后执行到Runner.run(),会先检查是否满足is_ready。如果不满足说明还有的Task并没有返回结果,直接返回。等待下一次被回调。均满足才得到全部结果执行send操作

总结

这段代码非常独立,还是很有看头很精彩的。被gen.engine修饰一次意味着存在一个Runner,这个Runner配合着多个Task,主要可以看做Task和Runner互相调用的过程。Task每被回调一次则Runner.run()被调用一次,执行一次send操作,yield往下走一次返回下一个Task对象。另外这段代码里面还提供了很坑爹的Callback、Wait控制方式。思维有点奇葩,一般人不太会去用,实现倒是并不复杂

责编内容来自:邹雷 (源链) | 更多关于

阅读提示:酷辣虫无法对本内容的真实性提供任何保证,请自行验证并承担相关的风险与后果!
本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 后端存储 » Tornado 之gen.engine

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录