国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

詳解Python的Twisted框架中reactor事件管理器的用法

2019-11-25 16:46:47
字體:
來源:轉載
供稿:網友

鋪墊
在大量的實踐中,似乎我們總是通過類似的方式來使用異步編程:

  • 監聽事件
  • 事件發生執行對應的回調函數
  • 回調完成(可能產生新的事件添加進監聽隊列)
  • 回到1,監聽事件

因此我們將這樣的異步模式稱為Reactor模式,例如在iOS開發中的Run Loop概念,實際上非常類似于Reactor loop,主線程的Run Loop監聽屏幕UI事件,一旦發生UI事件則執行對應的事件處理代碼,還可以通過GCD等方式產生事件至主線程執行。

2016525114543434.png (524×364)

上圖是boost對Reactor模式的描繪,Twisted的設計就是基于這樣的Reactor模式,Twisted程序就是在等待事件、處理事件的過程中不斷循環。

from twisted.internet import reactorreactor.run()

reactor是Twisted程序中的單例對象。

reactor
reactor是事件管理器,用于注冊、注銷事件,運行事件循環,當事件發生時調用回調函數處理。關于reactor有下面幾個結論:

  • Twisted的reactor只有通過調用reactor.run()來啟動。
  • reactor循環是在其開始的進程中運行,也就是運行在主進程中。
  • 一旦啟動,就會一直運行下去。reactor就會在程序的控制下(或者具體在一個啟動它的線程的控制下)。
  • reactor循環并不會消耗任何CPU的資源。
  • 并不需要顯式的創建reactor,只需要引入就OK了。

最后一條需要解釋清楚。在Twisted中,reactor是Singleton(也就是單例模式),即在一個程序中只能有一個reactor,并且只要你引入它就相應地創建一個。上面引入的方式這是twisted默認使用的方法,當然了,twisted還有其它可以引入reactor的方法。例如,可以使用twisted.internet.pollreactor中的系統調用來poll來代替select方法。

若使用其它的reactor,需要在引入twisted.internet.reactor前安裝它。下面是安裝pollreactor的方法:

from twisted.internet import pollreactorpollreactor.install()

如果你沒有安裝其它特殊的reactor而引入了twisted.internet.reactor,那么Twisted會根據操作系統安裝默認的reactor。正因為如此,習慣性做法不要在最頂層的模塊內引入reactor以避免安裝默認reactor,而是在你要使用reactor的區域內安裝。
下面是使用 pollreactor重寫上上面的程序:

from twited.internet import pollreactorpollreactor.install()from twisted.internet import reactorreactor.run()

那么reactor是如何實現單例的?來看一下from twisted.internet import reactor做了哪些事情就并明白了。

下面是twisted/internet/reactor.py的部分代碼:

# twisted/internet/reactor.pyimport sysdel sys.modules['twisted.internet.reactor']from twisted.internet import defaultdefault.install()

注:Python中所有加載到內存的模塊都放在sys.modules,它是一個全局字典。當import一個模塊時首先會在這個列表中查找是否已經加載了此模塊,如果加載了則只是將模塊的名字加入到正在調用import的模塊的命名空間中。如果沒有加載則從sys.path目錄中按照模塊名稱查找模塊文件,找到后將模塊載入內存,并加入到sys.modules中,并將名稱導入到當前的命名空間中。

假如我們是第一次運行from twisted.internet import reactor,因為sys.modules中還沒有twisted.internet.reactor,所以會運行reactory.py中的代碼,安裝默認的reactor。之后,如果導入的話,因為sys.modules中已存在該模塊,所以會直接將sys.modules中的twisted.internet.reactor導入到當前命名空間。

default中的install:

# twisted/internet/default.pydef _getInstallFunction(platform):  """  Return a function to install the reactor most suited for the given platform.  @param platform: The platform for which to select a reactor.  @type platform: L{twisted.python.runtime.Platform}  @return: A zero-argument callable which will install the selected    reactor.  """  try:    if platform.isLinux():      try:        from twisted.internet.epollreactor import install      except ImportError:        from twisted.internet.pollreactor import install    elif platform.getType() == 'posix' and not platform.isMacOSX():      from twisted.internet.pollreactor import install    else:      from twisted.internet.selectreactor import install  except ImportError:    from twisted.internet.selectreactor import install  return installinstall = _getInstallFunction(platform)

很明顯,default中會根據平臺獲取相應的install。Linux下會首先使用epollreactor,如果內核還不支持,就只能使用pollreactor。Mac平臺使用pollreactor,windows使用selectreactor。每種install的實現差不多,這里我們抽取selectreactor中的install來看看。

# twisted/internet/selectreactor.py:def install():  """Configure the twisted mainloop to be run using the select() reactor.  """  # 單例  reactor = SelectReactor()  from twisted.internet.main import installReactor  installReactor(reactor)# twisted/internet/main.py:def installReactor(reactor):  """  Install reactor C{reactor}.  @param reactor: An object that provides one or more IReactor* interfaces.  """  # this stuff should be common to all reactors.  import twisted.internet  import sys  if 'twisted.internet.reactor' in sys.modules:    raise error.ReactorAlreadyInstalledError("reactor already installed")  twisted.internet.reactor = reactor  sys.modules['twisted.internet.reactor'] = reactor

在installReactor中,向sys.modules添加twisted.internet.reactor鍵,值就是再install中創建的單例reactor。以后要使用reactor,就會導入這個單例了。

SelectReactor# twisted/internet/selectreactor.py@implementer(IReactorFDSet)class SelectReactor(posixbase.PosixReactorBase, _extraBase)
implementer表示SelectReactor實現了IReactorFDSet接口的方法,這里用到了zope.interface,它是python中的接口實現,有興趣的同學可以去看下。

IReactorFDSet接口主要對描述符的獲取、添加、刪除等操作的方法。這些方法看名字就能知道意思,所以我就沒有加注釋。

# twisted/internet/interfaces.pyclass IReactorFDSet(Interface):  def addReader(reader):  def addWriter(writer):  def removeReader(reader):  def removeWriter(writer):  def removeAll():  def getReaders():  def getWriters():reactor.listenTCP()

示例中的reactor.listenTCP()注冊了一個監聽事件,它是父類PosixReactorBase中方法。

# twisted/internet/posixbase.py@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,            ReactorBase):  def listenTCP(self, port, factory, backlog=50, interface=''):    p = tcp.Port(port, factory, backlog, interface, self)    p.startListening()    return p# twisted/internet/tcp.py@implementer(interfaces.IListeningPort)class Port(base.BasePort, _SocketCloser):  def __init__(self, port, factory, backlog=50, interface='', reactor=None):    """Initialize with a numeric port to listen on.    """    base.BasePort.__init__(self, reactor=reactor)    self.port = port    self.factory = factory    self.backlog = backlog    if abstract.isIPv6Address(interface):      self.addressFamily = socket.AF_INET6      self._addressType = address.IPv6Address    self.interface = interface  ...  def startListening(self):    """Create and bind my socket, and begin listening on it.     創建并綁定套接字,開始監聽。    This is called on unserialization, and must be called after creating a    server to begin listening on the specified port.    """    if self._preexistingSocket is None:      # Create a new socket and make it listen      try:        # 創建套接字        skt = self.createInternetSocket()        if self.addressFamily == socket.AF_INET6:          addr = _resolveIPv6(self.interface, self.port)        else:          addr = (self.interface, self.port)        # 綁定        skt.bind(addr)      except socket.error as le:        raise CannotListenError(self.interface, self.port, le)      # 監聽      skt.listen(self.backlog)    else:      # Re-use the externally specified socket      skt = self._preexistingSocket      self._preexistingSocket = None      # Avoid shutting it down at the end.      self._shouldShutdown = False    # Make sure that if we listened on port 0, we update that to    # reflect what the OS actually assigned us.    self._realPortNumber = skt.getsockname()[1]    log.msg("%s starting on %s" % (        self._getLogPrefix(self.factory), self._realPortNumber))    # The order of the next 5 lines is kind of bizarre. If no one    # can explain it, perhaps we should re-arrange them.    self.factory.doStart()    self.connected = True    self.socket = skt    self.fileno = self.socket.fileno    self.numberAccepts = 100    # startReading調用reactor的addReader方法將Port加入讀集合    self.startReading()

整個邏輯很簡單,和正常的server端一樣,創建套接字、綁定、監聽。不同的是將套接字的描述符添加到了reactor的讀集合。那么假如有了client連接過來的話,reactor會監控到,然后觸發事件處理程序。

reacotr.run()事件主循環

# twisted/internet/posixbase.py@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,            ReactorBase)# twisted/internet/base.pyclass _SignalReactorMixin(object):  def startRunning(self, installSignalHandlers=True):    """    PosixReactorBase的父類_SignalReactorMixin和ReactorBase都有該函數,但是    _SignalReactorMixin在前,安裝mro順序的話,會先調用_SignalReactorMixin中的。    """    self._installSignalHandlers = installSignalHandlers    ReactorBase.startRunning(self)  def run(self, installSignalHandlers=True):    self.startRunning(installSignalHandlers=installSignalHandlers)    self.mainLoop()  def mainLoop(self):    while self._started:      try:        while self._started:          # Advance simulation time in delayed event          # processors.          self.runUntilCurrent()          t2 = self.timeout()          t = self.running and t2          # doIteration是關鍵,select,poll,epool實現各有不同          self.doIteration(t)      except:        log.msg("Unexpected error in main loop.")        log.err()      else:        log.msg('Main loop terminated.')

mianLoop就是最終的主循環了,在循環中,調用doIteration方法監控讀寫描述符的集合,一旦發現有描述符準備好讀寫,就會調用相應的事件處理程序。

# twisted/internet/selectreactor.py@implementer(IReactorFDSet)class SelectReactor(posixbase.PosixReactorBase, _extraBase):  def __init__(self):    """    Initialize file descriptor tracking dictionaries and the base class.    """    self._reads = set()    self._writes = set()    posixbase.PosixReactorBase.__init__(self)  def doSelect(self, timeout):    """    Run one iteration of the I/O monitor loop.    This will run all selectables who had input or output readiness    waiting for them.    """    try:      # 調用select方法監控讀寫集合,返回準備好讀寫的描述符      r, w, ignored = _select(self._reads,                  self._writes,                  [], timeout)    except ValueError:      # Possibly a file descriptor has gone negative?      self._preenDescriptors()      return    except TypeError:      # Something *totally* invalid (object w/o fileno, non-integral      # result) was passed      log.err()      self._preenDescriptors()      return    except (select.error, socket.error, IOError) as se:      # select(2) encountered an error, perhaps while calling the fileno()      # method of a socket. (Python 2.6 socket.error is an IOError      # subclass, but on Python 2.5 and earlier it is not.)      if se.args[0] in (0, 2):        # windows does this if it got an empty list        if (not self._reads) and (not self._writes):          return        else:          raise      elif se.args[0] == EINTR:        return      elif se.args[0] == EBADF:        self._preenDescriptors()        return      else:        # OK, I really don't know what's going on. Blow up.        raise    _drdw = self._doReadOrWrite    _logrun = log.callWithLogger    for selectables, method, fdset in ((r, "doRead", self._reads),                      (w,"doWrite", self._writes)):      for selectable in selectables:        # if this was disconnected in another thread, kill it.        # ^^^^ --- what the !@#*? serious! -exarkun        if selectable not in fdset:          continue        # This for pausing input when we're not ready for more.        # 調用_doReadOrWrite方法        _logrun(selectable, _drdw, selectable, method)  doIteration = doSelect  def _doReadOrWrite(self, selectable, method):    try:      # 調用method,doRead或者是doWrite,      # 這里的selectable可能是我們監聽的tcp.Port      why = getattr(selectable, method)()    except:      why = sys.exc_info()[1]      log.err()    if why:      self._disconnectSelectable(selectable, why, method=="doRead")

那么假如客戶端有連接請求了,就會調用讀集合中tcp.Port的doRead方法。

# twisted/internet/tcp.py@implementer(interfaces.IListeningPort)class Port(base.BasePort, _SocketCloser):  def doRead(self):    """Called when my socket is ready for reading.    當套接字準備好讀的時候調用    This accepts a connection and calls self.protocol() to handle the    wire-level protocol.    """    try:      if platformType == "posix":        numAccepts = self.numberAccepts      else:        numAccepts = 1      for i in range(numAccepts):        if self.disconnecting:          return        try:          # 調用accept          skt, addr = self.socket.accept()        except socket.error as e:          if e.args[0] in (EWOULDBLOCK, EAGAIN):            self.numberAccepts = i            break          elif e.args[0] == EPERM:            continue          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):            log.msg("Could not accept new connection (%s)" % (              errorcode[e.args[0]],))            break          raise        fdesc._setCloseOnExec(skt.fileno())        protocol = self.factory.buildProtocol(self._buildAddr(addr))        if protocol is None:          skt.close()          continue        s = self.sessionno        self.sessionno = s+1        # transport初始化的過程中,會將自身假如到reactor的讀集合中,那么當它準備        # 好讀的時候,就可以調用它的doRead方法讀取客戶端發過來的數據了        transport = self.transport(skt, protocol, addr, self, s, self.reactor)        protocol.makeConnection(transport)      else:        self.numberAccepts = self.numberAccepts+20    except:      log.deferr()

doRead方法中,調用accept產生了用于接收客戶端數據的套接字,將套接字與transport綁定,然后把transport加入到reactor的讀集合。當客戶端有數據到來時,就會調用transport的doRead方法進行數據讀取了。

Connection是Server(transport實例的類)的父類,它實現了doRead方法。

# twisted/internet/tcp.py@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,         _AbortingMixin):  def doRead(self):    try:      # 接收數據      data = self.socket.recv(self.bufferSize)    except socket.error as se:      if se.args[0] == EWOULDBLOCK:        return      else:        return main.CONNECTION_LOST    return self._dataReceived(data)  def _dataReceived(self, data):    if not data:      return main.CONNECTION_DONE    # 調用我們自定義protocol的dataReceived方法處理數據    rval = self.protocol.dataReceived(data)    if rval is not None:      offender = self.protocol.dataReceived      warningFormat = (        'Returning a value other than None from %(fqpn)s is '        'deprecated since %(version)s.')      warningString = deprecate.getDeprecationWarningString(        offender, versions.Version('Twisted', 11, 0, 0),        format=warningFormat)      deprecate.warnAboutFunction(offender, warningString)    return rval

_dataReceived中調用了示例中我們自定義的EchoProtocol的dataReceived方法處理數據。

至此,一個簡單的流程,從創建監聽事件,到接收客戶端數據就此結束了。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 茌平县| 山阴县| 新乡县| 临邑县| 乌拉特中旗| 中江县| 九龙坡区| 彰化县| 龙里县| 新平| 松阳县| 德化县| 香格里拉县| 鸡西市| 新营市| 天峻县| 三都| 伽师县| 崇文区| 开平市| 东乡县| 景东| 通化县| 洪雅县| 娱乐| 肇东市| 宜昌市| 永福县| 天等县| 荆门市| 乌拉特前旗| 辽中县| 海兴县| 靖安县| 平阳县| 雷山县| 运城市| 石台县| 长丰县| 河曲县| 宽城|