低调大师

您现在的位置是: 首页 > Tornado1.0源码分析-Networking

文章详情

Tornado1.0源码分析-Networking

2015-2-28 16:50 9已围观 收藏 加入我们

#Asynchronous NetWorking#

作者:MetalBug
时间:2015-02-28
出处:http://my.oschina.net/u/247728/blog
声明:版权所有,侵犯必究
  • tornado.ioloop — Main event loop
  • tornado.iostream — Convenient wrappers for non-blocking sockets

##1.ioloop##

IOLoop是一个非阻塞的IO事件循环。 典型的应用使用一个IOLoop对象,通常通过IOLoop.instance()获得。 使用以下三个函数往IOLoop中注册事件,回调或者定时器,最后使用IOLoop.start()即可。

IOLoop.add_handler(fd, handler, events)

增加一个IO事件,当事件发生时,hanlder(fd,events)会被调用

IOLoop.add_callback(callback, *args, **kwargs)

增加一个回调函数,该回调函数将会在下一次IO迭代中被执行。

IOLoop.add_timeout(self, deadline, callback)

增加一个定时器,该定时器会在到达dealline时被执行。

以下是个简单的例子,利用IOLoop实现了一个TCP Server

    import errno
    import functools
    import ioloop
    import socket

    def connection_ready(sock, fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error, e:
                if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                    raise
                return
            connection.setblocking(0)
            handle_connection(connection, address)

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0)
    sock.bind(("", port))
    sock.listen(128)

    io_loop = ioloop.IOLoop.instance()
    callback = functools.partial(connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
    io_loop.start()

内部实现中IOLoop根据不同的操作系统使用不同的IO复用机制。

  • Linux -------epoll:使用的是level-triggered触发方式。
  • FreeBSD ---kQuene
  • other -----select

###1.1内部实现-数据结构###

    self._impl = impl or _poll()
    self._handlers = {}
    self._events = {}
    self._callbacks = set()
    self._timeouts = []
    self._running = False
    self._stopped = False
    self._blocking_log_threshold = None

_impl表示后端使用的IO复用机制,可以自己指定或者采用系统默认

_handlers维护了fd与其对应handler关系

_events维护了fd与其对应event关系

_callback维护所有在下一个IO迭代中会被调用的回调函数

_timeouts是一个有序列表,根据deadline排序,保存了未到期的所有定时器

_running_stopped是用于表示IOLoop是否start

_blocking_log_threshold表示最大阻塞时间

###1.2内部实现-主要函数### IOLoop.start()实现了事件循环,内部实现为一个巨大的while循环,在每次迭代中,会一次检查以下事件:

  1. _callbacks
  2. _timeouts
  3. poll返回的_events

一旦事件就绪,就出发对应的回调函数,这个循环会一直持续到IOLoop.stop()的调用,即_stopped被置为True

IOLoop.start()流程图:

IOLoop.start()流程图

###1.3内部实现-实现细节###

  1. 由于IOLoop平时阻塞在poll调用中,为了让IOLoop能够立即执行callback函数,需要设法唤醒它。这里采用的是pipeIOLoop始终监视该管道的readable事件,在需要唤醒的时候,往管道中写入一个字节,这样IOLoop技能从IO复用(poll)中返回。

    初始化pipe if os.name != 'nt': r, w = os.pipe() self._set_nonblocking(r) self._set_nonblocking(w) self._set_close_exec(r) self._set_close_exec(w) self._waker_reader = os.fdopen(r, "r", 0) self._waker_writer = os.fdopen(w, "w", 0)

    唤醒IOLoop def _wake(self): try: self._waker_writer.write("x") except IOError: pass 在add_callback时,需要唤醒IOLoop从而使其立即执行callback def add_callback(self, callback): self._callbacks.add(callback) self._wake()

  1. 因为callback函数能够对_callbacks进行修改(add, remove)等,所以用一个局部变量存储当前_callbacks,对该局部变量进行操作。

对于_callbacks的执行,并没有反复执行callback直到_callbacks为空,这里这样做应该是为了防止IOLoop陷入死循环,无法处理IO时间,而且也设置_blocking_log_threshold,通过singertimer来防止IOLoop卡死。 如果_callbacks不能执行完,这里会将poll_timeout设置为0,即为立即返回,为的的在下次IO迭代中能够立即执行_callbacks

    callbacks = list(self._callbacks)
    for callback in callbacks:
        if callback in self._callbacks:
            self._callbacks.remove(callback)
            self._run_callback(callback)
    if self._callbacks:
        poll_timeout = 0.0
  1. 对于_timeouts,采用列表存储,并且按照deadline从小到大排序,这样才每个IO迭代中,只需要从头开始遍历列表得到比deadline小于当前时间的事件并执行即可。

    #_Timeout的cmp函数
    def __cmp__(self, other):
        return cmp((self.deadline, id(self.callback)),
                   (other.deadline, id(other.callback)))
    #往IOLoop中添加timeout,保持有序
    def add_timeout(self, deadline, callback):
        timeout = _Timeout(deadline, callback)
        bisect.insort(self._timeouts, timeout)
        return timeout

##2.iostream## iostream对非阻塞式的 socket 的简单封装,以方便常用读写操作。

###2.1内部实现-数据结构###

IOStream内部维护了一个read_buffer和write_buffer,将维护的socket注册到IOLoop上,利用IOLoop管理读写事件。

def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
             read_chunk_size=4096):
    ##
    self._read_buffer = ""
    self._write_buffer = ""
    self.io_loop = io_loop or ioloop.IOLoop.instance()
    self.io_loop.add_handler(
        self.socket.fileno(), self._handle_events, self._state)
    ##

###2.2内部实现-主要函数###

IOStream._handl_events()根据对应events的类型,调用不同的callback

def _handle_events(self, fd, events):
    if events & self.io_loop.READ:
        self._handle_read()
    if events & self.io_loop.WRITE:
        self._handle_write()
    if events & self.io_loop.ERROR:
        self.close()
        return
    state = self.io_loop.ERROR
    if self._read_delimiter or self._read_bytes:
        state |= self.io_loop.READ
    if self._write_buffer:
        state |= self.io_loop.WRITE
    if state != self._state:
        self._state = state
        self.io_loop.update_handler(self.socket.fileno(), self._state)

#总结 在Tornado1.0版本中,IOLoop只考虑在单线程下的实现,对于多线程的处理并没有考虑,其函数并没有考虑跨线程调用对关键数据的保护。

例如对于_callbacks,暴露给所有的线程,单多线程情况下,可能会出现callback None的情况。 在Tornado4.1中,对于多线程的情况有了考虑,具体的见后序博文。

IOStream中,主要涉及到的是一个buffer的设计,内部使用了chunk,一个简易的块进行加快读写。这个的设计没有什么出彩之处,对于buffer的设计可以看看别的库是怎么设计的(TODO)。

文章转载至:https://my.oschina.net/u/247728/blog/380845
收藏 (0)

文章评论

共有0条评论来说两句吧...