轉載請注明: TheViper http://www.survivalescaperooms.com/TheViper
在將tornado改成rails的風格形式,并可以設置隱藏參數中分析了tornado的路由機制,這里不再分析。
本屌才疏學淺,所以分析主要是以tornado的執行流程為線索,配合源碼的注釋和網上的一些資料來說明。基于tornado 3.2.2
tornado的設計模型

可以看到,tornado分為四層:最底層的EVENT層處理IO事件;TCP層實現了TCP服務器,負責數據傳輸;HTTP/HTTPS層基于HTTP協議實現了HTTP服務器和客戶端;最上層為WEB框架,包含了處理器、模板、數據庫連接、認證、本地化等等WEB框架需要具備的功能。
更確切點,tornado服務器的工作流程是這樣的。

首先按照socket(創建套接字)->bind(綁定)->listen(監聽)的順序創建listen socket監聽客戶端,并將每個listen socket的fd注冊到IOLoop的單例實例中;當listen socket可讀時回調_handle_events處理客戶端請求;在與客戶端通信的過程中使用IOStream封裝了讀、寫緩沖區,實現與客戶端的異步讀寫。
下面開始源碼部分,個人喜歡按照執行流程的順序看代碼。
具體執行流程
啟動文件
application = web.Application([ (r"/", MainPageHandler), ]) http_server = httpserver.HTTPServer(application) http_server.listen(8080) ioloop.IOLoop.instance().start()
這是tornado最簡單的一種啟動方式,當然還有其他方式,請參考文檔。
首先是實例化了web.Application這個類。
里面這幾個方法值得注意,__init__(),listen(),add_handlers(),__call__()。
__init__()初始化Application類,一般將處理器直接傳入,它會調用 add_handlers 添加這些處理器,初始化還包括 transforms (分塊、壓縮等)、UI模塊、靜態文件處理器的初始化。 add_handlers 方法負責添加URI和處理器的映射。
當監聽到請求時,tornado通過調用 Application 實例觸發 __call__.具體參見另一篇將tornado改成rails的風格形式,并可以設置隱藏參數。
而listen()只是對HTTPServer 中的 listen 的封裝。
def listen(self, port, address="", **kwargs): from tornado.httpserver import HTTPServer server = HTTPServer(self, **kwargs) server.listen(port, address)
HTTPServer類。
在httpserver.py里面。
A non-blocking, single-threaded HTTP server. A server is defined by a request callback that takes an HTTPRequest instance as an argument and writes a valid HTTP response with `HTTPRequest.write`. `HTTPRequest.finish` finishes the request (but does not necessarily close the connection in the case of HTTP/1.1 keep-alive requests).HTTPServer是一個無阻塞、單線程HTTP服務器。支持HTTP/1.1協議keep-alive連接,但不支持chunked encoding。服務器支持’X-Real-IP’和’X-Scheme’頭以及SSL傳輸,支持多進程為prefork模式實現。代碼很簡單。
class HTTPServer(TCPServer): def __init__(self, request_callback, no_keep_alive=False, io_loop=None, xheaders=False, ssl_options=None, protocol=None, **kwargs): self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders self.protocol = protocol TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options, **kwargs) def handle_stream(self, stream, address): HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders, self.protocol)
里面的self.request_callback是上面的Application實例。
TCPServer類,在tcpserver.py里面
class TCPServer(object): def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None): self.io_loop = io_loop self.ssl_options = ssl_options self._sockets = {} # fd -> socket object,保存socket self._pending_sockets = [] self._started = False self.max_buffer_size = max_buffer_size if self.ssl_options is not None and isinstance(self.ssl_options, dict): ... def listen(self, port, address=""): sockets = bind_sockets(port, address=address) self.add_sockets(sockets) def add_sockets(self, sockets): if self.io_loop is None: self.io_loop = IOLoop.current() for sock in sockets: self._sockets[sock.fileno()] = sock add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop) def _handle_connection(self, connection, address): if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl_wrap_socket(connection, self.ssl_options, server_side=True, do_handshake_on_connect=False) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error as err: # If the connection is closed immediately after it is created # (as in a port scan), we can get one of several errors. # wrap_socket makes an internal call to getpeername, # which may return either EINVAL (Mac OS X) or ENOTCONN # (linux). If it returns ENOTCONN, this error is # silently swallowed by the ssl module, so we need to # catch another error later on (AttributeError in # SSLIOStream._do_ssl_handshake). # To test this behavior, try nmap with the -sT flag. # https://github.com/tornadoweb/tornado/pull/750 if err.args[0] in (errno.ECONNABORTED, errno.EINVAL): return connection.close() else: raise try: if self.ssl_options is not None: stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) else: stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) self.handle_stream(stream, address) except Exception: app_log.error("Error in connection callback", exc_info=True)
這里省略了用其他方式啟動tornado時,在tcpserver.py中執行的代碼。
到__init__()時,httpserver=httpserver.HTTPServer(application)就執行完畢,得到一個HTTPServer對象。
接著是http_server.listen(8080),在上面TCPServer類里面。
bind_sockets()在netutil.py里面,用來創建socket實例的,返回一個含有socket實例的列表。
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None): sockets = [] if address == "": address = None if not socket.has_ipv6 and family == socket.AF_UNSPEC: # Python can be compiled with --disable-ipv6, which causes # Operations on AF_INET6 sockets to fail, but does not # automatically exclude those results from getaddrinfo # results. # http://bugs.python.org/issue16208 family = socket.AF_INET if flags is None: flags = socket.AI_PASSIVE for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)): af, socktype, proto, canonname, sockaddr = res try: sock = socket.socket(af, socktype, proto) except socket.error as e: if e.args[0] == errno.EAFNOSUPPORT: continue raise set_close_exec(sock.fileno()) if os.name != 'nt': sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: # On linux, ipv6 sockets accept ipv4 too by default, # but this makes it impossible to bind to both # 0.0.0.0 in ipv4 and :: in ipv6. On other systems, # separate sockets *must* be used to listen for both ipv4 # and ipv6. For consistency, always disable ipv4 on our # ipv6 sockets and use a separate ipv4 socket when needed. # # Python 2.x on windows doesn't have IPPROTO_IPV6. if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.setblocking(0) sock.bind(sockaddr) sock.listen(backlog) sockets.append(sock) return sockets
bind_sockets在啟動監聽端口過程中調用,getaddrinfo返回服務器的所有網卡信息, 每塊網卡上都要創建監聽客戶端的請求并返回創建的sockets。創建socket過程中綁定地址和端口,同時設置了fcntl.FD_CLOEXEC(創建子進程時關閉打開的socket)和socket.SO_REUSEADDR(保證某一socket關閉后立即釋放端口,實現端口復用)標志位。sock.listen(backlog=128)默認設定等待被處理的連接最大個數為128。
對于 TCP 編程的總結就是:創建一個監聽 socket,然后把它綁定到端口和地址上并開始監聽,然后不停 accept。上面的bind_sockets()只做了綁定和監聽。

關于套接字,強烈推薦這一篇linux學習筆記:套接字,寫的很好。上面bind_sockets()里的些細節在里面都有很詳細的說明。
然后是tcpserver.py里面,IOLoop先不用管它,先只用知道它是單例的,通過IOLoop.current()獲取。
self.add_sockets(sockets),遍歷sockets,保存到self._sockets.
add_accept_handler(sock, self._handle_connection,io_loop=self.io_loop)。在netutil.py里面.
def add_accept_handler(sock, callback, io_loop=None): if io_loop is None: io_loop = IOLoop.current() def accept_handler(fd, events): while True: try: connection, address = sock.accept() except socket.error as e: # EWOULDBLOCK and EAGAIN indicate we have accepted every # connection that is available. if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return # ECONNABORTED indicates that there was a connection # but it was closed while still in the accept queue. # (observed on FreeBSD). if e.args[0] == errno.ECONNABORTED: continue raise callback(connection, address) io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
里面定義的accept_handler()負責完成上面沒完成的socket的接收。參數callback是tcpserver.py里面的_handle_connection()._handle_connection()在接受客戶端的連接處理結束之后會被調用.里面很簡單了,拋開ssl的處理,就是創建IOStream對象了。
注意,從這里開始,我們就假設tornado接收到了客戶端的連接請求,開始處理請求。在這之前的行為都是在初始化。
def _handle_connection(self, connection, address): if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl_wrap_socket(connection, self.ssl_options, server_side=True, do_handshake_on_connect=False) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error as err: # If the connection is closed immediately after it is created # (as in a port scan), we can get one of several errors. # wrap_socket makes an internal call to getpeername, # which may return either EINVAL (Mac OS X) or ENOTCONN # (Linux). If it returns ENOTCONN, this error is # silently swallowed by the ssl module, so we need to # catch another error later on (AttributeError in # SSLIOStream._do_ssl_handshake). # To test this behavior, try nmap with the -sT flag. # https://github.com/tornadoweb/tornado/pull/750 if err.args[0] in (errno.ECONNABORTED, errno.EINVAL): return connection.close() else: raise try: if self.ssl_options is not None: stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) else: stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) self.handle_stream(stream, address) except Exception: app_log.error("Error in connection callback", exc_info=True)
handle_stream()后面會說。
接著是add_accept_handler()中最后的io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ).
這個是向 loloop 對象注冊在fd上的read事件和回調函數accept_handler。該回調函數是現成定義的,屬于IOLoop層次的回調,每當事件發生時就會調用。回調內容也就是accept得到新socket和客戶端地址,然后調用callback向上層傳遞事件。從上面的分析可知,當read事件發生時,accept_handler被調用,進而callback=_handle_connection被調用。
最后是tcpserver.py中_handle_connection()里面的self.handle_stream(stream, address)。
這個在子類HTTPServer中實現的。
def handle_stream(self, stream, address): HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders, self.protocol)
創建HTTPConnection對象,它封裝了IOStream的一些操作,用來處理http客戶端的連接還有http請求的,執行request回調,直到http連接關閉。
class HTTPConnection(object): def __init__(self, stream, address, request_callback, no_keep_alive=False, xheaders=False, protocol=None): self.stream = stream self.address = address # Save the socket's address family now so we know how to # interpret self.address even after the stream is closed # and its socket attribute replaced with None. self.address_family = stream.socket.family self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders self.protocol = protocol self._clear_request_state() # Save stack context here, outside of any request. This keeps # contexts from one request from leaking into the next. self._header_callback = stack_context.wrap(self._on_headers) self.stream.set_close_callback(self._on_connection_close) self.stream.read_until(b"/r/n/r/n", self._header_callback)self.request_callback是前面的Application實例,stream是IOStream實例.
_clear_request_state():在請求被當做垃圾回收或連接關閉時清空請求狀態。
def _clear_request_state(self): self._request = None self._request_finished = False self._write_callback = None self._close_callback = Noneself._header_callback = stack_context.wrap(self._on_headers),它對多線程執行環境的上下文做了一些維護。這個在低一點的版本里沒有,都是直接self.stream.read_until(b"/r/n/r/n", self._on_headers),搞不懂。
然后是 self.stream.set_close_callback(self._on_connection_close)設置關閉連接時的回調。
def set_close_callback(self, callback): self._close_callback = stack_context.wrap(callback) def _on_connection_close(self): if self._close_callback is not None: callback = self._close_callback self._close_callback = None callback() # Delete any unfinished callbacks to break up reference cycles. self._header_callback = None self._clear_request_state()最后是最重要的self.stream.read_until(b"/r/n/r/n", self._header_callback).顧名思義,就是一直讀取知道/r/n/r/n(這一般意味這請求頭的結束),然后調用回調函數_on_headers(這是 IOStream 層次的回調)。
這里先不討論IOstream里的read_until。先說_on_headers.
def _on_headers(self, data): try: data = native_str(data.decode('latin1')) eol = data.find("/r/n") start_line = data[:eol] try: method, uri, version = start_line.split(" ") except ValueError: raise _BadRequestException("Malformed HTTP request line") if not version.startswith("HTTP/"): raise _BadRequestException("Malformed HTTP version in HTTP Request-Line") try: headers = httputil.HTTPHeaders.parse(data[eol:]) except ValueError: # Probably from split() if there was no ':' in the line raise _BadRequestException("Malformed HTTP headers") # HTTPRequest wants an IP, not a full socket address if self.address_family in (socket.AF_INET, socket.AF_INET6): remote_ip = self.address[0] else: # Unix (or other) socket; fake the remote address remote_ip = '0.0.0.0' self._request = HTTPRequest( connection=self, method=method, uri=uri, version=version, headers=headers, remote_ip=remote_ip, protocol=self.protocol) content_length = headers.get("Content-Length") if content_length: content_length = int(content_length) if content_length > self.stream.max_buffer_size: raise _BadRequestException("Content-Length too long") if headers.get("Expect") == "100-continue": self.stream.write(b"HTTP/1.1 100 (Continue)/r/n/r/n") self.stream.read_bytes(content_length, self._on_request_body) return self.request_callback(self._request) except _BadRequestException as e: gen_log.info("Malformed HTTP request from %r: %s", self.address, e) self.close() return
data = native_str(data.decode('latin1'))。里面的data是IOStream解析socket接收到的請求內容。類似于,

然后由此可以解析出method, uri, version.然后進一步解析header.
將解析出來的header信息,傳入HTTPRequest(),創建HTTPRequest對象,
content_length = headers.get("Content-Length"),上面這個例子頭信息沒有Content-Length,所以直接走后面的self.request_callback(self._request)。就回到Application類里面的__call__(self,request)了。
下面說HTTPRequest,它是客戶端請求的代表,它攜帶了所有和客戶端請求的信息。對它的調用實際上是,HTTPRequest在訪問傳入它的那些請求參數或調用傳入它的HTTPConnection里面的方法。
新聞熱點
疑難解答