V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
rapospectre
V2EX  ›  Python

深入理解 tornado 之 底层 ioloop 实现(一)

  •  1
     
  •   rapospectre ·
    bluedazzle · 2016-06-06 21:11:55 +08:00 · 5975 次点击
    这是一个创建于 2881 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近打算学习 tornado 的源码,所以就建立一个系列主题 **“深入理解 tornado ”**。 在此记录学习经历及个人见解与大家分享。文中一定会出现理解不到位或理解错误的地方,还请大家多多指教:

    进入正题:

    tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构(其他 python web 框架的自带 server 基本是基于 wsgi 写的简单服务器,并没有自己实现底层结构。 关于 wsgi 详见之前的文章: 自己写一个 wsgi 服务器运行 Django 、 Tornado 应用)。 那么 tornado.ioloop 就是 tornado web server 最底层的实现。

    看 ioloop 之前,我们需要了解一些预备知识,有助于我们理解 ioloop 。

    epoll

    ioloop 的实现基于 epoll ,那么什么是 epoll ? epoll 是 Linux 内核为处理大批量文件描述符而作了改进的 poll 。 那么什么又是 poll ? 首先,我们回顾一下, socket 通信时的服务端,当它接受( accept )一个连接并建立通信后( connection )就进行通信,而此时我们并不知道连接的客户端有没有信息发完。 这时候我们有两种选择:

    1. 一直在这里等着直到收发数据结束;
    2. 每隔一定时间来看看这里有没有数据;

    第一种办法虽然可以解决问题,但我们要注意的是对于一个线程\进程同时只能处理一个 socket 通信,其他连接只能被阻塞。 显然这种方式在单进程情况下不现实。

    第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是 poll / select 的解决方案。 看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的 socket 大多不是活跃的,所以轮询所花费的大部分时间将是无用的。为了解决这个问题, epoll 被创造出来,它的概念和 poll 类似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量连接时轮询就节省了大量时间。

    对于 epoll 的操作,其实也很简单,只要 4 个 API 就可以完全操作它。

    epoll_create

    用来创建一个 epoll 描述符( 就是创建了一个 epoll )

    epoll_ctl

    操作 epoll 中的 event ;可用参数有:

    | 参数 | 含义 | | ------------ | ------------ | | EPOLL_CTL_ADD | 添加一个新的 epoll 事件 | | EPOLL_CTL_DEL | 删除一个 epoll 事件 | | EPOLL_CTL_MOD | 改变一个事件的监听方式 |

    而事件的监听方式有七种,而我们只需要关心其中的三种:

    | 宏定义 | 含义 | | ------------ | ------------ | | EPOLLIN | 缓冲区满,有数据可读 | | EPOLLOUT | 缓冲区空,可写数据 | | EPOLLERR | 发生错误 |

    epoll_wait

    就是让 epoll 开始工作,里面有个参数 timeout ,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。

    在监听时有数据活跃的连接时其返回活跃的文件句柄列表(此处为 socket 文件句柄)。

    close

    关闭 epoll

    现在了解了 epoll 后,我们就可以来看 ioloop 了 (如果对 epoll 还有疑问可以看这两篇资料: epoll 的原理是什么百度百科: epoll

    tornado.ioloop

    很多初学者一定好奇 tornado 运行服务器最后那一句 tornado.ioloop.IOLoop.current().start() 到底是干什么的。 我们先不解释作用,来看看这一句代码背后到底都在干什么。

    先贴 ioloop 代码:

    from __future__ import absolute_import, division, print_function, with_statement
    
    import datetime
    import errno
    import functools
    import heapq       # 最小堆
    import itertools
    import logging
    import numbers
    import os
    import select
    import sys
    import threading
    import time
    import traceback
    import math
    
    from tornado.concurrent import TracebackFuture, is_future
    from tornado.log import app_log, gen_log
    from tornado.platform.auto import set_close_exec, Waker
    from tornado import stack_context
    from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds
    
    try:
        import signal
    except ImportError:
        signal = None
    
    
    if PY3:
        import _thread as thread
    else:
        import thread
    
    
    _POLL_TIMEOUT = 3600.0
    
    
    class TimeoutError(Exception):
        pass
    
    
    class IOLoop(Configurable):
        _EPOLLIN = 0x001
        _EPOLLPRI = 0x002
        _EPOLLOUT = 0x004
        _EPOLLERR = 0x008
        _EPOLLHUP = 0x010
        _EPOLLRDHUP = 0x2000
        _EPOLLONESHOT = (1 << 30)
        _EPOLLET = (1 << 31)
    
        # Our events map exactly to the epoll events
        NONE = 0
        READ = _EPOLLIN
        WRITE = _EPOLLOUT
        ERROR = _EPOLLERR | _EPOLLHUP
    
        # Global lock for creating global IOLoop instance
        _instance_lock = threading.Lock()
    
        _current = threading.local()
    
        @staticmethod
        def instance():
            if not hasattr(IOLoop, "_instance"):
                with IOLoop._instance_lock:
                    if not hasattr(IOLoop, "_instance"):
                        # New instance after double check
                        IOLoop._instance = IOLoop()
            return IOLoop._instance
    
        @staticmethod
        def initialized():
            """Returns true if the singleton instance has been created."""
            return hasattr(IOLoop, "_instance")
    
        def install(self):
            assert not IOLoop.initialized()
            IOLoop._instance = self
    
        @staticmethod
        def clear_instance():
            """Clear the global `IOLoop` instance.
            .. versionadded:: 4.0
            """
            if hasattr(IOLoop, "_instance"):
                del IOLoop._instance
    
        @staticmethod
        def current(instance=True):
            current = getattr(IOLoop._current, "instance", None)
            if current is None and instance:
                return IOLoop.instance()
            return current
    
        def make_current(self):
            IOLoop._current.instance = self
    
        @staticmethod
        def clear_current():
            IOLoop._current.instance = None
    
        @classmethod
        def configurable_base(cls):
            return IOLoop
    
        @classmethod
        def configurable_default(cls):
            if hasattr(select, "epoll"):
                from tornado.platform.epoll import EPollIOLoop
                return EPollIOLoop
            if hasattr(select, "kqueue"):
                # Python 2.6+ on BSD or Mac
                from tornado.platform.kqueue import KQueueIOLoop
                return KQueueIOLoop
            from tornado.platform.select import SelectIOLoop
            return SelectIOLoop
    
        def initialize(self, make_current=None):
            if make_current is None:
                if IOLoop.current(instance=False) is None:
                    self.make_current()
            elif make_current:
                if IOLoop.current(instance=False) is not None:
                    raise RuntimeError("current IOLoop already exists")
                self.make_current()
    
        def close(self, all_fds=False):
            raise NotImplementedError()
    
        def add_handler(self, fd, handler, events):
            raise NotImplementedError()
    
        def update_handler(self, fd, events):
            raise NotImplementedError()
    
        def remove_handler(self, fd):
            raise NotImplementedError()
    
        def set_blocking_signal_threshold(self, seconds, action):
            raise NotImplementedError()
    
        def set_blocking_log_threshold(self, seconds):
            self.set_blocking_signal_threshold(seconds, self.log_stack)
    
        def log_stack(self, signal, frame):
            gen_log.warning('IOLoop blocked for %f seconds in\n%s',
                            self._blocking_signal_threshold,
                            ''.join(traceback.format_stack(frame)))
    
        def start(self):
            raise NotImplementedError()
    
        def _setup_logging(self):
            if not any([logging.getLogger().handlers,
                        logging.getLogger('tornado').handlers,
                        logging.getLogger('tornado.application').handlers]):
                logging.basicConfig()
    
        def stop(self):
            raise NotImplementedError()
    
        def run_sync(self, func, timeout=None):
            future_cell = [None]
    
            def run():
                try:
                    result = func()
                    if result is not None:
                        from tornado.gen import convert_yielded
                        result = convert_yielded(result)
                except Exception:
                    future_cell[0] = TracebackFuture()
                    future_cell[0].set_exc_info(sys.exc_info())
                else:
                    if is_future(result):
                        future_cell[0] = result
                    else:
                        future_cell[0] = TracebackFuture()
                        future_cell[0].set_result(result)
                self.add_future(future_cell[0], lambda future: self.stop())
            self.add_callback(run)
            if timeout is not None:
                timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
            self.start()
            if timeout is not None:
                self.remove_timeout(timeout_handle)
            if not future_cell[0].done():
                raise TimeoutError('Operation timed out after %s seconds' % timeout)
            return future_cell[0].result()
    
        def time(self):
            return time.time()
    ...
    

    IOLoop 类首先声明了 epoll 监听事件的宏定义,当然,如前文所说,我们只要关心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。

    类中的方法有很多,看起来有点晕,但其实我们只要关心 IOLoop 核心功能的方法即可,其他的方法在明白核心功能后也就不难理解了。所以接下来我们着重分析核心代码。

    instanceinitializedinstallclear_instancecurrentmake_currentclear_current 这些方法不用在意细节,总之现在记住它们都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop 就好。

    你一定疑惑 IOLoop 为何没有 __init__, 其实是因为要初始化成为单例, IOLoop 的 new 函数已经被改写了,同时指定了 initialize 做为它的初始化方法,所以此处没有 __init__ 。 说到这, ioloop 的代码里好像没有看到 new 方法,这又是什么情况? 我们先暂时记住这里。

    接着我们来看这个初始化方法:

    def initialize(self, make_current=None):
            if make_current is None:
                if IOLoop.current(instance=False) is None:
                    self.make_current()
            elif make_current:
                if IOLoop.current(instance=False) is None:
                    raise RuntimeError("current IOLoop already exists")
                self.make_current()
    
        def make_current(self):
            IOLoop._current.instance = self
    

    what? 里面只是判断了是否第一次初始化或者调用 self.make_current () 初始化,而 make_current() 里也仅仅是把实例指定为自己,那么初始化到底去哪了?

    然后再看看 start()run()close() 这些关键的方法都成了返回 NotImplementedError 错误,全部未定义?!跟网上搜到的源码分析完全不一样啊。 这时候看下 IOLoop 的继承关系,原来问题出在这里,之前的 tornado.ioloop 继承自 object 所以所有的一切都自己实现,而现在版本的 tornado.ioloop 则继承自 Configurable 看起来现在的 IOLoop 已经成为了一个基类,只定义了接口。 所以接着看 Configurable 代码:

    tornado.util.Configurable

    class Configurable(object):
        __impl_class = None
        __impl_kwargs = None
    
        def __new__(cls, *args, **kwargs):
            base = cls.configurable_base()
            init_kwargs = {}
            if cls is base:
                impl = cls.configured_class()
                if base.__impl_kwargs:
                    init_kwargs.update(base.__impl_kwargs)
            else:
                impl = cls
            init_kwargs.update(kwargs)
            instance = super(Configurable, cls).__new__(impl)
            # initialize vs __init__ chosen for compatibility with AsyncHTTPClient
            # singleton magic.  If we get rid of that we can switch to __init__
            # here too.
            instance.initialize(*args, **init_kwargs)
            return instance
    
        @classmethod
        def configurable_base(cls):
            """Returns the base class of a configurable hierarchy.
    
            This will normally return the class in which it is defined.
            (which is *not* necessarily the same as the cls classmethod parameter).
            """
            raise NotImplementedError()
    
        @classmethod
        def configurable_default(cls):
            """Returns the implementation class to be used if none is configured."""
            raise NotImplementedError()
    
        def initialize(self):
            """Initialize a `Configurable` subclass instance.
    
            Configurable classes should use `initialize` instead of ``__init__``.
    
            .. versionchanged:: 4.2
               Now accepts positional arguments in addition to keyword arguments.
            """
    
        @classmethod
        def configure(cls, impl, **kwargs):
            """Sets the class to use when the base class is instantiated.
    
            Keyword arguments will be saved and added to the arguments passed
            to the constructor.  This can be used to set global defaults for
            some parameters.
            """
            base = cls.configurable_base()
            if isinstance(impl, (unicode_type, bytes)):
                impl = import_object(impl)
            if impl is not None and not issubclass(impl, cls):
                raise ValueError("Invalid subclass of %s" % cls)
            base.__impl_class = impl
            base.__impl_kwargs = kwargs
    
        @classmethod
        def configured_class(cls):
            """Returns the currently configured class."""
            base = cls.configurable_base()
            if cls.__impl_class is None:
                base.__impl_class = cls.configurable_default()
            return base.__impl_class
    
        @classmethod
        def _save_configuration(cls):
            base = cls.configurable_base()
            return (base.__impl_class, base.__impl_kwargs)
    
        @classmethod
        def _restore_configuration(cls, saved):
            base = cls.configurable_base()
            base.__impl_class = saved[0]
            base.__impl_kwargs = saved[1]
    

    之前我们寻找的 __new__ 出现了! 注意其中这句: impl = cls.configured_class() impl 在这里就是 epoll ,它的生成函数是 configured_class(), 而其方法里又有 base.__impl_class = cls.configurable_default() ,调用了 configurable_default() 。而 Configurableconfigurable_default():

    v2ex 限制了文章最大长度 20000 ,可以继续看第二部分或直接点击原文阅读

    8 条回复    2016-06-07 17:50:42 +08:00
    jy02201949
        1
    jy02201949  
       2016-06-06 21:28:25 +08:00
    先收再看
    loalj
        2
    loalj  
       2016-06-06 21:30:30 +08:00
    mark
    Jaylee
        3
    Jaylee  
       2016-06-06 21:36:41 +08:00
    mark
    hanfeng3015
        4
    hanfeng3015  
       2016-06-07 09:25:13 +08:00
    mark
    1130335361
        5
    1130335361  
       2016-06-07 13:37:27 +08:00   ❤️ 1
    可以考虑给 blog 加个 rss
    rapospectre
        6
    rapospectre  
    OP
       2016-06-07 16:54:21 +08:00
    @1130335361 好建议!已经加上啦
    1130335361
        7
    1130335361  
       2016-06-07 17:22:24 +08:00
    @rapospectre 已订阅,但是 rss 里的 content 没加内容,无法全文输出
    rapospectre
        8
    rapospectre  
    OP
       2016-06-07 17:50:42 +08:00
    @1130335361 现在博文正文部分后端存的都是 markdown ,显示是前端动态渲染成 html 的。所以 rss 里现在如果加进去都是原始的 markdown 。之后我处理下再把正文加进去。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2851 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 07:07 · PVG 15:07 · LAX 00:07 · JFK 03:07
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.