国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

tornado源碼分析之http和tcp層

2019-11-14 17:28:20
字體:
來源:轉載
供稿:網友

轉載請注明: TheViper http://www.survivalescaperooms.com/TheViper

將tornado改成rails的風格形式,并可以設置隱藏參數中分析了tornado的路由機制,這里不再分析。

本屌才疏學淺,所以分析主要是以tornado的執行流程為線索,配合源碼的注釋和網上的一些資料來說明。基于tornado 3.2.2

tornado的設計模型

可以看到,tornado分為四層:最底層的EVENT層處理IO事件;TCP層實現了TCP服務器,負責數據傳輸;HTTP/HTTPS層基于HTTP協議實現了HTTP服務器和客戶端;最上層為WEB框架,包含了處理器、模板數據庫連接、認證、本地化等等WEB框架需要具備的功能。

 更確切點,tornado服務器的工作流程是這樣的。

首先按照socket(創建套接字)->bind(綁定)->listen(監聽)的順序創建listen socket監聽客戶端,并將每個listen socket的fd注冊到IOLoop的單例實例中;當listen socket可讀時回調_handle_events處理客戶端請求;在與客戶端通信的過程中使用IOStream封裝了讀、寫緩沖區,實現與客戶端的異步讀寫。

下面開始源碼部分,個人喜歡按照執行流程的順序看代碼。

 具體執行流程

啟動文件

        application = web.Application([            (r"/", MainPageHandler),        ])        http_server = httpserver.HTTPServer(application)        http_server.listen(8080)        ioloop.IOLoop.instance().start()

 這是tornado最簡單的一種啟動方式,當然還有其他方式,請參考文檔。

 首先是實例化了web.Application這個類。

里面這幾個方法值得注意,__init__(),listen(),add_handlers(),__call__()。

__init__()初始化Application類,一般將處理器直接傳入,它會調用 add_handlers 添加這些處理器,初始化還包括 transforms (分塊、壓縮等)、UI模塊、靜態文件處理器的初始化。 add_handlers 方法負責添加URI和處理器的映射。

當監聽到請求時,tornado通過調用 Application 實例觸發 __call__.具體參見另一篇將tornado改成rails的風格形式,并可以設置隱藏參數

而listen()只是對HTTPServer 中的 listen 的封裝。

    def listen(self, port, address="", **kwargs):        from tornado.httpserver import HTTPServer        server = HTTPServer(self, **kwargs)        server.listen(port, address)

 

HTTPServer類。

 在httpserver.py里面。

    A non-blocking, single-threaded HTTP server.    A server is defined by a request callback that takes an HTTPRequest    instance as an argument and writes a valid HTTP response with    `HTTPRequest.write`. `HTTPRequest.finish` finishes the request (but does    not necessarily close the connection in the case of HTTP/1.1 keep-alive    requests).

HTTPServer是一個無阻塞、單線程HTTP服務器。支持HTTP/1.1協議keep-alive連接,但不支持chunked encoding。服務器支持’X-Real-IP’和’X-Scheme’頭以及SSL傳輸,支持多進程為prefork模式實現。代碼很簡單。

class HTTPServer(TCPServer):        def __init__(self, request_callback, no_keep_alive=False, io_loop=None,                 xheaders=False, ssl_options=None, protocol=None, **kwargs):        self.request_callback = request_callback        self.no_keep_alive = no_keep_alive        self.xheaders = xheaders        self.protocol = protocol        TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,                           **kwargs)    def handle_stream(self, stream, address):        HTTPConnection(stream, address, self.request_callback,                       self.no_keep_alive, self.xheaders, self.protocol)

 里面的self.request_callback是上面的Application實例。

TCPServer類,在tcpserver.py里面

class TCPServer(object):        def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None):        self.io_loop = io_loop        self.ssl_options = ssl_options        self._sockets = {}  # fd -> socket object,保存socket        self._pending_sockets = []        self._started = False        self.max_buffer_size = max_buffer_size        if self.ssl_options is not None and isinstance(self.ssl_options, dict):            ...    def listen(self, port, address=""):        sockets = bind_sockets(port, address=address)        self.add_sockets(sockets)    def add_sockets(self, sockets):        if self.io_loop is None:            self.io_loop = IOLoop.current()        for sock in sockets:            self._sockets[sock.fileno()] = sock            add_accept_handler(sock, self._handle_connection,                               io_loop=self.io_loop)    def _handle_connection(self, connection, address):        if self.ssl_options is not None:            assert ssl, "Python 2.6+ and OpenSSL required for SSL"            try:                connection = ssl_wrap_socket(connection,                                             self.ssl_options,                                             server_side=True,                                             do_handshake_on_connect=False)            except ssl.SSLError as err:                if err.args[0] == ssl.SSL_ERROR_EOF:                    return connection.close()                else:                    raise            except socket.error as err:                # If the connection is closed immediately after it is created                # (as in a port scan), we can get one of several errors.                # wrap_socket makes an internal call to getpeername,                # which may return either EINVAL (Mac OS X) or ENOTCONN                # (linux).  If it returns ENOTCONN, this error is                # silently swallowed by the ssl module, so we need to                # catch another error later on (AttributeError in                # SSLIOStream._do_ssl_handshake).                # To test this behavior, try nmap with the -sT flag.                # https://github.com/tornadoweb/tornado/pull/750                if err.args[0] in (errno.ECONNABORTED, errno.EINVAL):                    return connection.close()                else:                    raise        try:            if self.ssl_options is not None:                stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            else:                stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            self.handle_stream(stream, address)        except Exception:            app_log.error("Error in connection callback", exc_info=True)

這里省略了用其他方式啟動tornado時,在tcpserver.py中執行的代碼。

到__init__()時,httpserver=httpserver.HTTPServer(application)就執行完畢,得到一個HTTPServer對象。

接著是http_server.listen(8080),在上面TCPServer類里面。

bind_sockets()在netutil.py里面,用來創建socket實例的,返回一個含有socket實例的列表。

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None):    sockets = []    if address == "":        address = None    if not socket.has_ipv6 and family == socket.AF_UNSPEC:        # Python can be compiled with --disable-ipv6, which causes        # Operations on AF_INET6 sockets to fail, but does not        # automatically exclude those results from getaddrinfo        # results.        # http://bugs.python.org/issue16208        family = socket.AF_INET    if flags is None:        flags = socket.AI_PASSIVE    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,                                      0, flags)):        af, socktype, proto, canonname, sockaddr = res        try:            sock = socket.socket(af, socktype, proto)        except socket.error as e:            if e.args[0] == errno.EAFNOSUPPORT:                continue            raise        set_close_exec(sock.fileno())        if os.name != 'nt':            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        if af == socket.AF_INET6:            # On linux, ipv6 sockets accept ipv4 too by default,            # but this makes it impossible to bind to both            # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,            # separate sockets *must* be used to listen for both ipv4            # and ipv6.  For consistency, always disable ipv4 on our            # ipv6 sockets and use a separate ipv4 socket when needed.            #            # Python 2.x on windows doesn't have IPPROTO_IPV6.            if hasattr(socket, "IPPROTO_IPV6"):                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)        sock.setblocking(0)        sock.bind(sockaddr)        sock.listen(backlog)        sockets.append(sock)    return sockets

bind_sockets在啟動監聽端口過程中調用,getaddrinfo返回服務器的所有網卡信息, 每塊網卡上都要創建監聽客戶端的請求并返回創建的sockets。創建socket過程中綁定地址和端口,同時設置了fcntl.FD_CLOEXEC(創建子進程時關閉打開的socket)和socket.SO_REUSEADDR(保證某一socket關閉后立即釋放端口,實現端口復用)標志位。sock.listen(backlog=128)默認設定等待被處理的連接最大個數為128。

 對于 TCP 編程的總結就是:創建一個監聽 socket,然后把它綁定到端口和地址上并開始監聽,然后不停 accept。上面的bind_sockets()只做了綁定和監聽。

關于套接字,強烈推薦這一篇linux學習筆記:套接字,寫的很好。上面bind_sockets()里的些細節在里面都有很詳細的說明。

 

然后是tcpserver.py里面,IOLoop先不用管它,先只用知道它是單例的,通過IOLoop.current()獲取。

self.add_sockets(sockets),遍歷sockets,保存到self._sockets.

add_accept_handler(sock, self._handle_connection,io_loop=self.io_loop)。在netutil.py里面.

def add_accept_handler(sock, callback, io_loop=None):    if io_loop is None:        io_loop = IOLoop.current()            def accept_handler(fd, events):        while True:            try:                connection, address = sock.accept()            except socket.error as e:                # EWOULDBLOCK and EAGAIN indicate we have accepted every                # connection that is available.                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):                    return                # ECONNABORTED indicates that there was a connection                # but it was closed while still in the accept queue.                # (observed on FreeBSD).                if e.args[0] == errno.ECONNABORTED:                    continue                raise            callback(connection, address)                io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)

里面定義的accept_handler()負責完成上面沒完成的socket的接收。參數callback是tcpserver.py里面的_handle_connection()._handle_connection()在接受客戶端的連接處理結束之后會被調用.里面很簡單了,拋開ssl的處理,就是創建IOStream對象了。

注意,從這里開始,我們就假設tornado接收到了客戶端的連接請求,開始處理請求。在這之前的行為都是在初始化。

    def _handle_connection(self, connection, address):        if self.ssl_options is not None:            assert ssl, "Python 2.6+ and OpenSSL required for SSL"            try:                connection = ssl_wrap_socket(connection,                                             self.ssl_options,                                             server_side=True,                                             do_handshake_on_connect=False)            except ssl.SSLError as err:                if err.args[0] == ssl.SSL_ERROR_EOF:                    return connection.close()                else:                    raise            except socket.error as err:                # If the connection is closed immediately after it is created                # (as in a port scan), we can get one of several errors.                # wrap_socket makes an internal call to getpeername,                # which may return either EINVAL (Mac OS X) or ENOTCONN                # (Linux).  If it returns ENOTCONN, this error is                # silently swallowed by the ssl module, so we need to                # catch another error later on (AttributeError in                # SSLIOStream._do_ssl_handshake).                # To test this behavior, try nmap with the -sT flag.                # https://github.com/tornadoweb/tornado/pull/750                if err.args[0] in (errno.ECONNABORTED, errno.EINVAL):                    return connection.close()                else:                    raise        try:            if self.ssl_options is not None:                stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            else:                stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            self.handle_stream(stream, address)        except Exception:            app_log.error("Error in connection callback", exc_info=True)

handle_stream()后面會說。

接著是add_accept_handler()中最后的io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ).

這個是向 loloop 對象注冊在fd上的read事件和回調函數accept_handler。該回調函數是現成定義的,屬于IOLoop層次的回調,每當事件發生時就會調用。回調內容也就是accept得到新socket和客戶端地址,然后調用callback向上層傳遞事件。從上面的分析可知,當read事件發生時,accept_handler被調用,進而callback=_handle_connection被調用。

最后是tcpserver.py中_handle_connection()里面的self.handle_stream(stream, address)。

這個在子類HTTPServer中實現的。

    def handle_stream(self, stream, address):        HTTPConnection(stream, address, self.request_callback,                       self.no_keep_alive, self.xheaders, self.protocol)

創建HTTPConnection對象,它封裝了IOStream的一些操作,用來處理http客戶端的連接還有http請求的,執行request回調,直到http連接關閉。

class HTTPConnection(object):    def __init__(self, stream, address, request_callback, no_keep_alive=False,                 xheaders=False, protocol=None):        self.stream = stream        self.address = address        # Save the socket's address family now so we know how to        # interpret self.address even after the stream is closed        # and its socket attribute replaced with None.        self.address_family = stream.socket.family        self.request_callback = request_callback        self.no_keep_alive = no_keep_alive        self.xheaders = xheaders        self.protocol = protocol        self._clear_request_state()        # Save stack context here, outside of any request.  This keeps        # contexts from one request from leaking into the next.        self._header_callback = stack_context.wrap(self._on_headers)        self.stream.set_close_callback(self._on_connection_close)        self.stream.read_until(b"/r/n/r/n", self._header_callback)

 self.request_callback是前面的Application實例,stream是IOStream實例.

_clear_request_state():在請求被當做垃圾回收或連接關閉時清空請求狀態。

    def _clear_request_state(self):        self._request = None        self._request_finished = False        self._write_callback = None        self._close_callback = None

self._header_callback = stack_context.wrap(self._on_headers),它對多線程執行環境的上下文做了一些維護。這個在低一點的版本里沒有,都是直接self.stream.read_until(b"/r/n/r/n", self._on_headers),搞不懂。

 然后是 self.stream.set_close_callback(self._on_connection_close)設置關閉連接時的回調。

    def set_close_callback(self, callback):        self._close_callback = stack_context.wrap(callback)    def _on_connection_close(self):        if self._close_callback is not None:            callback = self._close_callback            self._close_callback = None            callback()        # Delete any unfinished callbacks to break up reference cycles.        self._header_callback = None        self._clear_request_state()

 最后是最重要的self.stream.read_until(b"/r/n/r/n", self._header_callback).顧名思義,就是一直讀取知道/r/n/r/n(這一般意味這請求頭的結束),然后調用回調函數_on_headers(這是 IOStream 層次的回調)。

這里先不討論IOstream里的read_until。先說_on_headers.

def _on_headers(self, data):        try:            data = native_str(data.decode('latin1'))            eol = data.find("/r/n")            start_line = data[:eol]            try:                method, uri, version = start_line.split(" ")            except ValueError:                raise _BadRequestException("Malformed HTTP request line")            if not version.startswith("HTTP/"):                raise _BadRequestException("Malformed HTTP version in HTTP Request-Line")            try:                headers = httputil.HTTPHeaders.parse(data[eol:])            except ValueError:                # Probably from split() if there was no ':' in the line                raise _BadRequestException("Malformed HTTP headers")            # HTTPRequest wants an IP, not a full socket address            if self.address_family in (socket.AF_INET, socket.AF_INET6):                remote_ip = self.address[0]            else:                # Unix (or other) socket; fake the remote address                remote_ip = '0.0.0.0'            self._request = HTTPRequest(                connection=self, method=method, uri=uri, version=version,                headers=headers, remote_ip=remote_ip, protocol=self.protocol)            content_length = headers.get("Content-Length")            if content_length:                content_length = int(content_length)                if content_length > self.stream.max_buffer_size:                    raise _BadRequestException("Content-Length too long")                if headers.get("Expect") == "100-continue":                    self.stream.write(b"HTTP/1.1 100 (Continue)/r/n/r/n")                self.stream.read_bytes(content_length, self._on_request_body)                return            self.request_callback(self._request)        except _BadRequestException as e:            gen_log.info("Malformed HTTP request from %r: %s",                         self.address, e)            self.close()            return

 

 data = native_str(data.decode('latin1'))。里面的data是IOStream解析socket接收到的請求內容。類似于,

 然后由此可以解析出method, uri, version.然后進一步解析header.

將解析出來的header信息,傳入HTTPRequest(),創建HTTPRequest對象,

content_length = headers.get("Content-Length"),上面這個例子頭信息沒有Content-Length,所以直接走后面的self.request_callback(self._request)。就回到Application類里面的__call__(self,request)了。

 下面說HTTPRequest,它是客戶端請求的代表,它攜帶了所有和客戶端請求的信息。對它的調用實際上是,HTTPRequest在訪問傳入它的那些請求參數或調用傳入它的HTTPConnection里面的方法。

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 绥棱县| 文化| 衡阳县| 明光市| 南郑县| 黄龙县| 元氏县| 柳州市| 洪湖市| 新和县| 万源市| 彭水| 嵊州市| 乡城县| 宾川县| 都匀市| 田林县| 清丰县| 奉化市| 磴口县| 卓资县| 隆子县| 文水县| 清水县| 涟源市| 田阳县| 深州市| 昌邑市| 四川省| 论坛| 福建省| 金堂县| 吴川市| 定日县| 晋宁县| 黔西| 平远县| 宜宾县| 喀什市| 邢台市| 乐至县|