<del id="nnjnj"></del><track id="nnjnj"></track>

<p id="nnjnj"></p>

<address id="nnjnj"></address>

    <pre id="nnjnj"><pre id="nnjnj"></pre></pre>

      <noframes id="nnjnj"><ruby id="nnjnj"><ruby id="nnjnj"></ruby></ruby>

      • 自動秒收錄
      • 軟件:1973
      • 資訊:56215|
      • 收錄網站:181183|

      IT精英團

      Python中的協同學 是怎么回事?

      Python中的協同學 是怎么回事?

      瀏覽次數:
      評論次數:
      編輯: 樂詠
      信息來源: ITPUB
      更新日期: 2022-04-18 21:21:57
      摘要

      以下文章來源于公眾號-博海拾貝diary,作者so1n在io比較多的場景中,Async語法編寫的程序會以更少的時間,更少的資源來完成相同的任務,這篇文章則是介紹了Python的Async語法的協

      • 正文開始
      • 相關閱讀
      • 推薦作品

      以下文章來自微信官方賬號so1n的渤海撿殼日記。

      在io較多的場景下,異步文法編寫的程序會用更少的時間和更少的資源完成同樣的任務。本文介紹了Python異步語法的協同過程是如何實現的。

      1.傳統同步語法請求示例

      同樣,在理解異步語法的實現之前,先從Sync的語法示例開始?,F在假設有一個HTTP請求,這個程序會通過這個請求得到相應的響應內容并打印出來。代碼如下:

      導入套接字

      defrequest(主機:str)-None:

      ''模擬請求并打印響應正文'''

      url:str=f'http://{host} '

      sock:socket。SocketType=socket.socket()

      sock.connect((host,80))

      sock . send(f ' get { URL } HTTP/1.0 \ r \ n host : { host } \ r \ n \ r \ n '。編碼(' ascii '))

      response_bytes:bytes=b ' '

      chunk:bytes=sock.recv(4096)

      whilechunk:

      響應字節=塊

      chunk=sock.recv(4096)

      打印(' \n '。join([iforiinresponse _ bytes . decode()。拆分(' \r\n '))))

      if__name__=='__main__':

      請求(' so1n.me ')

      運行程序,程序可以正常輸出。上半部分打印相應的HTTP響應頭,下半部分打印HTTP響應體??梢钥吹椒掌髯屛覀円詇ttps的形式再次請求,輸出結果如下:

      HTTP/1.1301永久移動

      Server:GitHub.com

      Content-Type:text/html

      Location:https://so1n.me/

      x-GitHub-Request-id : a 744:3871:4136 af :48 bd9f :6188 db 50

      內容長度:162

      接受范圍:字節

      日期:年01月08日02時10分8:11:37GMT

      Via:1.1varnish清漆

      年齡:104

      連接:關閉

      x-Served-by : cache-qpg 1272-QPG

      X-Cache:HIT

      x緩存命中率:2

      x定時器:S1636359097.026094,VS0,VE0

      Vary:Accept接受編碼

      x-fast ly-Request-id :22 fa 337 f 777553d 33503 CEE 5282598 c6a 293 fb5e

      超文本標記語言

      標題301永久移動/標題/標題

      身體

      <center><h1>301 Moved Permanently</h1></center>
      <hr><center>nginx</center>
      </body>
      </html>

      不過這里并不是想說HTTP請求是如何實現的, 具體我也不太了解, 在這個代碼中, socket的默認調用是阻塞的, 當線程調用connect或者recv時(send是不用等待的, 但在高并發下需要先等待drain后才可以send, 小demo不需要用到drain方法), 程序將會暫停直到操作完成。當一次要下載很多網頁的話, 這將會如上篇文章所說的一樣, 大部分的等待時間都花在io上面, cpu卻一直空閑時, 而使用線程池雖然可以解決這個問題, 但是開銷是很大的, 同時操作系統往往會限制一個進程,用戶或者機器可以使用的線程數, 而協程卻沒有這些限制, 占用的資源少, 也沒有系統限制瓶頸。

      2.異步的請求

      異步可以讓一個單獨的線程處理并發的操作, 不過在上面已經說過了, socket是默認阻塞的, 所以需要把socket設置為非阻塞的, socket提供了setblocking這個方法供開發者選擇是否阻塞, 在設置了非阻塞后, connectrecv方法也要進行更改。

      由于沒有了阻塞, 程序在調用了connect后會馬上返回, 只不過Python的底層是C, 這段代碼在C中調用非阻塞的socket.connect后會拋出一個異常, 我們需要捕獲它, 就像這樣:

      import socket

      sock: socket.SocketType = socket.socket()
      sock.setblocking(Flase)
      try:
          sock.connect(("so1n.me", 80))
      except BlockingIOError:
          pass

      經過一頓操作后, 就開始申請建立連接了, 但是我們還不知道連接啥時候完成建立, 由于連接沒建立時調用send會報錯, 所以可以一直輪詢調用send直到沒報錯就認為是成功(真實代碼需要加超時):

      while True:
          try: 
              sock.send(request)
              break
          except OSError as e:
              pass

      但是這樣讓CPU空轉太浪費性能了, 而且期間還不能做別的事情, 就像我們點外賣后一直打電話過去問飯菜做好了沒有, 十分浪費電話費用, 要是飯菜做完了就打電話告訴我們, 那就只產生了一筆費用, 非常的省錢(正常情況下也是這樣子)。這時就需要事件循環登場了,在類UNIX中, 有一個叫select的功能, 它可以等待事件發生后再調用監聽的函數, 不過一開始的實現性能不是很好, 在Linux上被epoll取代, 不過接口是類似的, 所在在Python中把這幾個不同的事件循環都封裝在selectors庫中, 同時可以通過DefaultSelector從系統中挑出最好的類select函數。這里先暫時不說事件循環的原理, 事件循環最主要的是他名字的兩部分, 一個是事件, 一個是循環, 在Python中, 可以通過如下方法把事件注冊到事件循環中:

      def demo(): pass

      selector.register(fd, EVENT_WRITE, demo)

      這樣這個事件循環就會監聽對應的文件描述符fd, 當這個文件描述符觸發寫入事件(EVENT_WRITE)時,事件循環就會告訴我們可以去調用注冊的函數demo。不過如果把上面的代碼都改為這種方法去運行的話就會發現, 程序好像沒跑就結束了, 但程序其實是有跑的, 只不過他們是完成的了注冊, 然后就等待開發者接收事件循環的事件進行下一步的操作, 所以我們只需要在代碼的最后面寫上如下代碼:

      while True:
          for key, mask in selector.select():
              key.data()

      這樣程序就會一直運行, 當捕獲到事件的時候, 就會通過for循環告訴我們, 其中key.data是我們注冊的回調函數, 當事件發生時, 就會通知我們, 我們可以通過拿到回調函數然后就運行, 了解完畢后, 我們可以來編寫我們的第一個并發程序, 他實現了一個簡單的I/O復用的小邏輯, 代碼和注釋如下:

      import socket
      from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


      # 選擇事件循環
      selector: DefaultSelector = DefaultSelector()
      # 用于判斷是否有事件在運行
      running_cnt: int = 


      def request(host: str) -> None:
          """模擬請求并打印響應體"""
          # 告訴主函數, 自己的事件還在運行
          global running_cnt
          running_cnt += 1
          
          # 初始化socket
          url: str = f"http://{host}"
          sock: socket.SocketType = socket.socket()
          sock.setblocking(False)
          try:
              sock.connect((host, 80))
          except BlockingIOError:
              pass

          response_bytes: bytes = b""

          def read_response() -> None:
              """接收響應參數, 并判斷請求是否結束"""
              nonlocal response_bytes
              chunk: bytes = sock.recv(4096)
              print(f"recv {host} body success")
              if chunk:
                  response_bytes += chunk
              else:
                  # 沒有數據代表請求結束了, 注銷監聽
                  selector.unregister(sock.fileno())
                  global running_cnt
                  running_cnt -= 1

          def connected() -> None:
              """socket建立連接時的回調"""
              # 取消監聽
              selector.unregister(sock.fileno())
              print(f"{host} connect success")
              # 發送請求, 并監聽讀事件, 以及注冊對應的接收響應函數
              sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))
              selector.register(sock.fileno(), EVENT_READ, read_response)

          selector.register(sock.fileno(), EVENT_WRITE, connected)


      if __name__ == "__main__":
          # 同時多個請求
          request("so1n.me")
          request("github.com")
          request("google.com")
          request("baidu.com")
          # 監聽是否有事件在運行
          while running_cnt > :
              # 等待事件循環通知事件是否已經完成
              for key, mask in selector.select():
                  key.data()

      這段代碼接近同時注冊了4個請求并注冊建立連接回調, 然后就進入事件循環邏輯, 也就是把控制權交給事件循環, 直到事件循環告訴程序說收到了socket建立的通知, 程序就會取消注冊的回調然后發送請求, 并注冊一個讀的事件回調, 然后又把控制權交給事件循環, 直到收到了響應的結果才進入處理響應結果函數并且只有收完所有響應結果才會退出程序。下面是我其中的一次執行結果

      so1n.me connect success
      github.com connect success
      google.com connect success
      recv google.com body success
      recv google.com body success
      baidu.com connect success
      recv github.com body success
      recv github.com body success
      recv baidu.com body success
      recv baidu.com body success
      recv so1n.me body success
      recv so1n.me body success

      可以看到他們的執行順序是隨機的, 不是嚴格的按照so1n.me, github.com, google.com, baidu.com順序執行, 同時他們執行速度很快, 這個程序的耗時約等于響應時長最長的函數耗時。但是可以看出, 這個程序里面出現了兩個回調, 回調會讓代碼變得非常的奇怪, 降低可讀性, 也容易造成回調地獄, 而且當回調發生報錯的時候, 我們是很難知道這是由于什么導致的錯誤, 因為它的上下文丟失了, 這樣子排查問題十分的困惑。作為程序員, 一般都不止滿足于速度快的代碼, 真正想要的是又快, 又能像Sync的代碼一樣簡單, 可讀性強, 也能容易排查問題的代碼, 這種組合形式的代碼的設計模式就叫協程。

      協程出現得很早, 它不像線程一樣, 被系統調度, 而是能自主的暫停, 并等待事件循環通知恢復。由于協程是軟件層面實現的, 所以它的實現方式有很多種, 這里要說的是基于生成器的協程, 因為生成器跟協程一樣, 都有暫停讓步和恢復的方法(還可以通過throw來拋錯), 同時它跟Async語法的協程很像, 通過了解基于生成器的協程, 可以了解Async的協程是如何實現的。

      3.基于生成器的協程

      3.1生成器

      在了解基于生成器的協程之前, 需要先了解下生成器, Python的生成器函數與普通的函數會有一些不同, 只有普通函數中帶有關鍵字yield, 那么它就是生成器函數, 具體有什么不同可以通過他們的字節碼來了解:

      In [1]: import dis

      # 普通函數
      In [2]: def aaa(): pass

      In [3]: dis.dis(aaa)
                                                                                                                                                                                      
        1            LOAD_CONST                (None)
                    2 RETURN_VALUE

      # 普通函數調用函數
      In [4]: def bbb(): 
         ...:     aaa() 
         ...:


      In [5]: dis.dis(bbb)

        2            LOAD_GLOBAL               (aaa)
                    2 CALL_FUNCTION            
                    4 POP_TOP
                    6 LOAD_CONST                (None)
                    8 RETURN_VALUE

      # 普通生成器函數
      In [6]: def ccc(): yield

      In [7]: dis.dis(ccc)
                                                                                                                                                                                      
        1            LOAD_CONST                (None)
                    2 YIELD_VALUE
                    4 POP_TOP
                    6 LOAD_CONST                (None)
                    8 RETURN_VALUE

      上面分別是普通函數, 普通函數調用函數和普通生成器函數的字節碼, 從字節碼可以看出來, 最簡單的函數只需要LOAD_CONST來加載變量None壓入自己的棧, 然后通過RETURN_VALUE來返回值, 而有函數調用的普通函數則先加載變量, 把全局變量的函數aaa加載到自己的棧里面, 然后通過CALL_FUNCTION來調用函數, 最后通過POP_TOP把函數的返回值從棧里拋出來, 再把通過LOAD_CONST把None壓入自己的棧, 最后返回值。而生成器函數則不一樣, 它會先通過LOAD_CONST來加載變量None壓入自己的棧, 然后通過YIELD_VALUE返回值, 接著通過POP_TOP彈出剛才的棧并重新把變量None壓入自己的棧, 最后通過RETURN_VALUE來返回值。從字節碼來分析可以很清楚的看到, 生成器能夠在yield區分兩個棧幀, 一個函數調用可以分為多次返回, 很符合協程多次等待的特點。

      接著來看看生成器的一個使用, 這個生成器會有兩次yield調用, 并在最后返回字符串'None', 代碼如下:

      In [8]: def demo(): 
         ...:     a = 1 
         ...:     b = 2 
         ...:     print('aaa', locals()) 
         ...:     yield 1 
         ...:     print('bbb', locals()) 
         ...:     yield 2 
         ...:     return 'None' 
         ...:
                                                                                                                                                                                                         
      In [9]: demo_gen = demo()

      In [10]: demo_gen.send(None)

      aaa {'a': 1, 'b': 2}
      Out[10]: 1

      In [11]: demo_gen.send(None)

      bbb {'a': 1, 'b': 2}
      Out[11]: 2

      In [12]: demo_gen.send(None)

      ---------------------------------------------------------------------------
      StopIteration                             Traceback (most recent call last)
      <ipython-input-12-8f8cb075d6af> in <module>
      ----> 1 demo_gen.send(None)

      StopIteration: None

      這段代碼首先通過函數調用生成一個demo_gen的生成器對象, 然后第一次send調用時返回值1, 第二次send調用時返回值2, 第三次send調用則拋出StopIteration異常, 異常提示為None, 同時可以看到第一次打印aaa和第二次打印bbb時, 他們都能打印到當前的函數局部變量, 可以發現在即使在不同的棧幀中, 他們讀取到當前的局部函數內的局部變量是一致的, 這意味著如果使用生成器來模擬協程時, 它還是會一直讀取到當前上下文的, 非常的完美。

      此外, Python還支持通過yield from語法來返回一個生成器, 代碼如下:

      In [1]: def demo_gen_1(): 
         ...:     for i in range(3): 
         ...:         yield i 
         ...:

      In [2]: def demo_gen_2(): 
         ...:     yield from demo_gen_1() 
         ...:

      In [3]: demo_gen_obj = demo_gen_2()

      In [4]: demo_gen_obj.send(None)
                                                                                                                                                                           
      Out[4]: 

      In [5]: demo_gen_obj.send(None)

      Out[5]: 1

      In [6]: demo_gen_obj.send(None)

      Out[6]: 2

      In [7]: demo_gen_obj.send(None)
                                                                                                                                          
      ---------------------------------------------------------------------------
      StopIteration                             Traceback (most recent call last)
      <ipython-input-7-f9922a2f64c9> in <module>
      ----> 1 demo_gen_obj.send(None)

      StopIteration: 


      通過yield from就可以很方便的支持生成器調用, 假如把每個生成器函數都當做一個協程, 那通過yield from就可以很方便的實現協程間的調用, 此外生成器的拋出異常后的提醒非常人性化, 也支持throw來拋出異常, 這樣我們就可以實現在協程運行時設置異常, 比如Cancel,演示代碼如下:

      In [1]: def demo_exc(): 
         ...:     yield 1 
         ...:     raise RuntimeError() 
         ...:

      In [2]: def demo_exc_1(): 
         ...:     for i in range(3): 
         ...:         yield i 
         ...:

      In [3]: demo_exc_gen = demo_exc()

      In [4]: demo_exc_gen.send(None)
                                                                                                                                                                           
      Out[4]: 1

      In [5]: demo_exc_gen.send(None)
                                                                                                                                           
      ---------------------------------------------------------------------------
      RuntimeError                              Traceback (most recent call last)
      <ipython-input-5-09fbb75fdf7d> in <module>
      ----> 1 demo_exc_gen.send(None)

      <ipython-input-1-69afbc1f9c19> in demo_exc()
            1 def demo_exc():
            2     yield 1
      ----> 3     raise RuntimeError()
            4 

      RuntimeError: 

      In [6]: demo_exc_gen_1 = demo_exc_1()

      In [7]: demo_exc_gen_1.send(None)
                                                                                                                                              Out[7]: 

      In [8]: demo_exc_gen_1.send(None)
                                                                                                                                              Out[8]: 1

      In [9]: demo_exc_gen_1.throw(RuntimeError)
                                                                                                                                              ---------------------------------------------------------------------------
      RuntimeError                              Traceback (most recent call last)
      <ipython-input-9-1a1cc55d71f4> in <module>
      ----> 1 demo_exc_gen_1.throw(RuntimeError)

      <ipython-input-2-2617b2366dce> in demo_exc_1()
            1 def demo_exc_1():
            2     for i in range(3):
      ----> 3         yield i
            4 

      RuntimeError: 

      從中可以看到在運行中拋出異常時, 會有一個非常清楚的拋錯, 可以明顯看出錯誤堆棧, 同時throw指定異常后, 會在下一處yield拋出異常(所以協程調用Cancel后不會馬上取消, 而是下一次調用的時候才被取消)。

      3.2用生成器實現協程

      我們已經簡單的了解到了生成器是非常的貼合協程的編程模型, 同時也知道哪些生成器API是我們需要的API, 接下來可以模仿Asyncio的接口來實現一個簡單的協程。

      首先是在Asyncio中有一個封裝叫Feature, 它用來表示協程正在等待將來時的結果, 以下是我根據asyncio.Feature封裝的一個簡單的Feature, 它的API沒有asyncio.Feature全, 代碼和注釋如下:

      class Status:
          """用于判斷Future狀態"""
          pending: int = 1
          finished: int = 2
          cancelled: int = 3


      class Future(object):

          def __init__(self) -> None:
              """初始化時, Feature處理pending狀態, 等待set result"""
              self.status: int = Status.pending
              self._result: Any = None
              self._exception: Optional[Exception] = None
              self._callbacks: List[Callable[['Future'], None]] = []

          def add_done_callback(self, fn: [['Future'], None]Callable) -> None:
              """添加完成時的回調"""
              self._callbacks.append(fn)

          def cancel(self):
              """取消當前的Feature"""
              if self.status != Status.pending:
                  return False
              self.status = Status.cancelled
              for fn in self._callbacks:
                  fn(self)
              return True

          def set_exception(self, exc: Exception) -> None:
              """設置異常"""
              if self.status != Status.pending:
                  raise RuntimeError("Can not set exc")
              self._exception = exc
              self.status = Status.finished

          def set_result(self, result: Any) -> None:
              """設置結果"""
              if self.status != Status.pending:
                  raise RuntimeError("Can not set result")
              self.status = Status.finished
              self._result = result
              for fn in self._callbacks:
                  fn(self)

          def result(self):
              """獲取結果"""
              if self.status == Status.cancelled:
                  raise asyncio.CancelledError
              elif self.status != Status.finished:
                  raise RuntimeError("Result is not read")
              elif self._exception is not None:
                  raise self._exception
              return self._result


          def __iter__(self):
              """通過生成器來模擬協程, 當收到結果通知時, 會返回結果"""
              if self.status == Status.pending:
                  yield self
              return self.result()

      在理解Future時, 可以把它假想為一個狀態機, 在啟動初始化的時候是peding狀態, 在運行的時候我們可以切換它的狀態, 并且通過__iter__方法來支持調用者使用yield from Future()來等待Future本身, 直到收到了事件通知時, 可以得到結果。

      但是可以發現這個Future是無法自我驅動, 調用了__iter__的程序不知道何時被調用了set_result, 在Asyncio中是通過一個叫Task的類來驅動Future, 它將一個協程的執行過程安排好, 并負責在事件循環中執行該協程。它主要有兩個方法:

      • 1.初始化時, 會先通過send方法激活生成器
      • 2.后續被調度后馬上安排下一次等待, 除非拋出StopIteration異常

      還有一個支持取消運行托管協程的方法(在原代碼中, Task是繼承于Future, 所以Future有的它都有), 經過簡化后的代碼如下:

      class Task:
          def __init__(self, coro: Generator) -> None:
              # 初始化狀態
              self.cancelled: bool = False
              self.coro: Generator = coro
              # 預激一個普通的future
              f: Future = Future()
              f.set_result(None)
              self.step(f)

          def cancel(self) -> None:
              """用于取消托管的coro"""
              self.coro.throw(asyncio.CancelledError)

          def step(self, f: Future) -> None:
              """用于調用coro的下一步, 從第一次激活開始, 每次都添加完成時的回調, 直到遇到取消或者StopIteration異常"""
              try:
                  _future = self.coro.send(f.result())
              except asyncio.CancelledError:
                  self.cancelled = True
                  return
              except StopIteration:
                  return

              _future.add_done_callback(self.step)

      這樣FutureTask就封裝好了, 可以簡單的試一試效果如何:

      In [2]:def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]: 
         ...:    result = yield from f 
         ...:    print(flag_int, result) 
         ...:
         ...:future: Future = Future() 
         ...:for i in range(3): 
         ...:    coro = wait_future(future, i) 
         ...:    # 托管wait_future這個協程, 里面的Future也會通過yield from被托管 
         ...:    Task(coro) 
         ...:
         ...:print('ready') 
         ...:future.set_result('ok') 
         ...:
         ...:future = Future() 
         ...:Task(wait_future(future, 3)).cancel() 
         ...: 
                                                                                                                                              ready
       ok
      1 ok
      2 ok
      ---------------------------------------------------------------------------
      CancelledError                            Traceback (most recent call last)
      <ipython-input-2-2d1b04db2604> in <module>
           12 
           13 future = Future()
      ---> 14 Task(wait_future(future, 3)).cancel()

      <ipython-input-1-ec3831082a88> in cancel(self)
           81 
           82     def cancel(self) -> None:
      ---> 83         self.coro.throw(asyncio.CancelledError)
           84 
           85     def step(self, f: Future) -> None:

      <ipython-input-2-2d1b04db2604> in wait_future(f, flag_int)
            1 def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]:
      ----> 2     result = yield from f
            3     print(flag_int, result)
            4 
            5 future: Future = Future()

      <ipython-input-1-ec3831082a88> in __iter__(self)
           68         """通過生成器來模擬協程, 當收到結果通知時, 會返回結果"""
           69         if self.status == Status.pending:
      ---> 70             yield self
           71         return self.result()
           72 

      CancelledError: 

      這段程序會先初始化Future, 并把Future傳給wait_future并生成生成器, 再交由給Task托管, 預激,  由于Future是在生成器函數wait_future中通過yield from與函數綁定的, 真正被預激的其實是Future__iter__方法中的yield self, 此時代碼邏輯會暫停在yield self并返回。在全部預激后, 通過調用Futureset_result方法, 使Future變為結束狀態, 由于set_result會執行注冊的回調, 這時它就會執行托管它的Taskstep方法中的send方法, 代碼邏輯回到Future__iter__方法中的yield self, 并繼續往下走, 然后遇到return返回結果, 并繼續走下去, 從輸出可以發現程序封裝完成且打印了ready后, 會依次打印對應的返回結果, 而在最后一個的測試cancel方法中可以看到,Future拋出異常了, 同時這些異常很容易看懂, 能夠追隨到調用的地方。

      現在FutureTask正常運行了, 可以跟我們一開始執行的程序進行整合, 代碼如下:

      class HttpRequest(object):
          def __init__(self, host: str):
              """初始化變量和sock"""
              self._host: str = host
              global running_cnt
              running_cnt += 1
              self.url: str = f"http://{host}"
              self.sock: socket.SocketType = socket.socket()
              self.sock.setblocking(False)
              try:
                  self.sock.connect((host, 80))
              except BlockingIOError:
                  pass

          def read(self) -> Generator[Future, None, bytes]:
              """從socket獲取響應數據, 并set到Future中, 并通過Future.__iter__方法或得到數據并通過變量chunk_future返回"""
              f: Future = Future()
              selector.register(self.sock.fileno(), EVENT_READ, lambda: f.set_result(self.sock.recv(4096)))
              chunk_future = yield from f
              selector.unregister(self.sock.fileno())
              return chunk_future  # type: ignore

          def read_response(self) -> Generator[Future, None, bytes]:
              """接收響應參數, 并判斷請求是否結束"""
              response_bytes: bytes = b""
              chunk = yield from self.read()
              while chunk:
                  response_bytes += chunk
                  chunk = yield from self.read()
              return response_bytes

          def connected(self) -> Generator[Future, None, None]:
              """socket建立連接時的回調"""
              # 取消監聽
              f: Future = Future()
              selector.register(self.sock.fileno(), EVENT_WRITE, lambda: f.set_result(None))
              yield f
              selector.unregister(self.sock.fileno())
              print(f"{self._host} connect success")

          def request(self) -> Generator[Future, None, None]:
              # 發送請求, 并監聽讀事件, 以及注冊對應的接收響應函數
              yield from self.connected()
              self.sock.send(f"GET {self.url} HTTP/1.0\r\nHost: {self._host}\r\n\r\n".encode("ascii"))
              response = yield from self.read_response()
              print(f"request {self._host} success, length:{len(response)}")
              global running_cnt
              running_cnt -= 1


      if __name__ == "__main__":
          # 同時多個請求
          Task(HttpRequest("so1n.me").request())
          Task(HttpRequest("github.com").request())
          Task(HttpRequest("google.com").request())
          Task(HttpRequest("baidu.com").request())
          # 監聽是否有事件在運行
          while running_cnt > :
              # 等待事件循環通知事件是否已經完成
              for key, mask in selector.select():
                  key.data()

      這段代碼通過Future和生成器方法盡量的解耦回調函數, 如果忽略了HttpRequest中的connectedread方法則可以發現整段代碼跟同步的代碼基本上是一樣的, 只是通過yieldyield from交出控制權和通過事件循環恢復控制權。同時通過上面的異常例子可以發現異常排查非常的方便, 這樣一來就沒有了回調的各種糟糕的事情, 開發者只需要按照同步的思路進行開發即可, 不過我們的事件循環是一個非常簡單的事件循環例子, 同時對于socket相關都沒有進行封裝, 也缺失一些常用的API, 而這些都會被Python官方封裝到Asyncio這個庫中, 通過該庫, 我們可以近乎完美的編寫Async語法的代碼。

      NOTE: 由于生成器協程中無法通過yield from語法使用生成器, 所以Python在3.5之后使用了Await的原生協程。



      代碼規范設計模式的落地之路
      ? 上一篇 2022-04-18
      運行和維護監控中的采樣數據和統計數據
      下一篇 ? 2022-04-18
      • 胡迪核心知識點詳解(好文章合集)
        1閱讀 0條評論 個贊
        以下文章來源于公眾號-3分鐘秒懂大數據,作者在IT中穿梭旅行在Flink實時流中,經常會通過FlinkCDC插件讀取Mysql數據,然后寫入Hudi中。所以在執行上述操作時,需要了解……
      • 前端面試必須解決網絡中的跨域問題
        0閱讀 0條評論 個贊
        什么是跨域瀏覽器有一個重要的安全策略,稱之為「同源策略」其中,源=協議+主機+端口源=協議+主機+端口源=協議+主機+端口,兩個源相同,稱之為同源,兩個源不同,稱之為跨源或跨域比如:源1源2是否同……
      • 如何在Bash腳本中使用強大的Linux測試命令
        0閱讀 0條評論 個贊
        Linuxtest命令是Shell內置命令,用來檢測某個條件是否成立。test通常和if語句一起使用,并且大部分if語句都依賴test??梢詫⒁粋€元素與另一個元素進行比較,但它更?!?/div>
      • 真正的建筑設計是什么樣子的?
        1閱讀 0條評論 個贊
        什么是架構和架構本質在軟件行業,對于什么是架構,都有很多的爭論,每個人都有自己的理解。此君說的架構和彼君理解的架構未必是一回事。因此我們在討論架構之前,我們先討論架構的概念定義,概念是人認識這個世界的……
      • 10分鐘了解云原生 值得收藏~
        0閱讀 0條評論 個贊
        文章轉載:奇妙的Linux世界我們已經進入云計算下半場,不再像上半場在糾結要不要上云,而是討論怎么上云?才能把云計算的價值發揮到淋漓盡致。如何把云計算與不同的業務場景深度結合?如何讓技術真正作用于企業……
      發表評論 共有條評論
      用戶名: 密碼:
      驗證碼: 匿名發表
      • 基礎鞏固——至少需要多少行代碼才能實現深度復制?
        1閱讀 0條評論 個贊
        前言深度克?。ㄉ羁截悾┮恢倍际浅?、中級前端面試中經常被問到的題目,網上介紹的實現方式也都各有千秋,大體可以概括為三種方式:JSON.stringify+JSON.parse,這個很好理解;全量判斷類……
      • 前端面試必須解決網絡中的跨域問題
        0閱讀 0條評論 個贊
        什么是跨域瀏覽器有一個重要的安全策略,稱之為「同源策略」其中,源=協議+主機+端口源=協議+主機+端口源=協議+主機+端口,兩個源相同,稱之為同源,兩個源不同,稱之為跨源或跨域比如:源1源2是否同……
      • 運維 放下監控——也放下自己
        0閱讀 0條評論 個贊
        來源公眾號:云原生技術應用(ID:sreguide)根據多年和運維打交道的經歷,我發現,運維常常讓監控變得無效。。。1.我的監控故事我做過兩年多的運維工作,后面就轉做運維平臺開發了,也一步步看著監控……
      • 碼頭工人搭建彈性搜索集群教程
        1閱讀 0條評論 個贊
        寫在前面:為什么要用ElasticSearch?我們的應用經常需要添加檢索功能,開源的ElasticSearch是目前全文檢索引擎的首選。它可以快速的存儲、搜索和分析海量數據。ElasticSear……
      • 說說春云的全鏈路灰度發布方案~
        1閱讀 0條評論 個贊
        以下文章來源于公眾號-碼猿技術專欄,作者不才陳某大家好實際生產中如有需求變更,并不會直接更新線上服務,最通常的做法便是:切出線上的小部分流量進行體驗測試,經過測試后無問題則全面的上線。這樣做的好處也是……
      • SQL優化通用公式:5個步驟和10個案例
        1閱讀 0條評論 個贊
        導讀:在應用開發的早期,數據量少,開發人員開發功能時更重視功能上的實現,隨著生產數據的增長,很多SQL語句開始暴露出性能問題,對生產的影響也越來越大,有時可能這些有問題的SQL就是整個系統性能的瓶頸?!?/div>
      • 7 種提升Spring Boot吞吐量神技!
        0閱讀 0條評論 個贊
        一、異步執行實現方式二種:1.使用異步注解@aysnc、啟動類:添加@EnableAsync注解2.JDK8本身有一個非常好用的Future類——CompletableFuture@AllArg……
      • MySQL支持哈希索引嗎?(收藏)
        1閱讀 0條評論 個贊
        經常有朋友問,MySQL的InnoDB到底支不支持哈希索引?對于InnoDB的哈希索引,確切的應該這么說:(1)InnoDB用戶無法手動創建哈希索引,這一層上說,InnoDB確實不支持哈希索引;(2)……
      • 用Ansible實現MySQL的備份、操作和維護
        0閱讀 0條評論 個贊
        作者簡介曹杰,中國結算上海分公司高級經理,從事系統運維管理工作。本文以容器形式部署了開源自動化運維工具Ansible,基于自帶的MySQL管理模塊編排了playbook配置文件,最終實現M……
      • Redis過期的數據會被立即刪除嗎?
        1閱讀 0條評論 個贊
        本文來源碼哥字節(ID:MageByte)?碼哥,當key達到過期時間,Redis就會馬上刪除么?先說結論:并不會立馬刪除。Redis有兩種刪除過期數據的策略:定期選取部分數據刪除;惰性刪除;……
      • 本文將帶您了解kubernetes的架構和組件!
        1閱讀 0條評論 個贊
        kubernetes架構目標kubernetes是生產級的,用于跨主機部署,擴展,管理和組合應用程序容器的基礎設施。kubernetes不僅僅是“容器編排”,他更加主要的解決方向是消除協調計算資源,網……
      • Linux的10個最危險的命令
        0閱讀 0條評論 個贊
        rm-rf命令該命令可能導致不可恢復的系統崩壞。>rm-rf/#強制刪除根目錄下所有東西。>rm-rf*#強制刪除當前目錄的所有文件。>rm-rf.#強制刪除當前文件夾及其子文件夾。執行rm-r……
      • Linux最常用的命令:解決95%以上的問題
        1閱讀 0條評論 個贊
        Linux是目前應用最廣泛的服務器操作系統,基于Unix,開源免費,由于系統的穩定性和安全性,市場占有率很高,幾乎成為程序代碼運行的最佳系統環境。linux不僅可以長時間的運行我們編寫的程序代碼,還可……
      • 代碼| C語言根據可執行文件名獲取進程運行信息
        1閱讀 0條評論 個贊
        如下示例可根據可執行文件名獲得線程數、虛擬內存占用大小、物理內存占用大小、進程PID、CPU占用率和進程啟動次數等信息。1.程序源碼main.c:#include#include<……
      • MySQL 8.0如何創建規范表
        1閱讀 0條評論 個贊
        這一節內容,基于MySQL8.0版本,聊一下如何創建一張規范的表。首先貼出一張相對規范的表結構:CREATETABLEstudent_info(`id`INTNOTNULLAUTO_IN……
      • MYSQL VS POLARDB唯一索引死鎖及應用設計
        1閱讀 0條評論 個贊
        #issue68021MySQLuniquecheck問題-知乎(zhihu.com)事情的開始是這樣的,最近和阿里云密切聯系,也成為他們的大客戶,(我們當然是大客戶,BIGBIG……
      • MySQL 5.7 和 8.0 幾處細節上的差異
        9閱讀 0條評論 個贊
        MySQL8.0相對于MySQL5.7,有很多新特性,比如:快速加列、原子DDL、不可見索引、額外端口、角色管理等。這一節內容,就不講這些新特性了,只來聊聊最近在工作學習過程中遇到的幾處細節……
      • 運維常用的34個Linux Shell腳本 對你一定有幫助!
        1閱讀 0條評論 個贊
        作為一名Linux工程師,會寫好的腳本不僅能提高工作效率,還能有更多的時間做自己的事。最近在網上沖浪的時候,也注意收集一些大佬寫過的腳本,匯總整理一下,歡迎收藏,與君共勉?。?)用戶猜數字#!/b……
      • Linux中的交互式進程查看命令htop
        1閱讀 0條評論 個贊
        htop界面展示HtopLinux進程監控工具“Htop是一個用于Linux/類Unix系統的交互式實時進程監控應用程序,也是top命的替代品,它是所有Linux操作系統上預裝的默……
      • Docker 從入門到實戰 未雨綢繆收藏~
        1閱讀 0條評論 個贊
        一、概述1.1基本概念Docker是一個開源的應用容器引擎,基于Go語言并遵從Apache2.0協議開源。Docker可以讓開發者打包他們的應用以及依賴包到一個輕量級、可移植的容器中……
      • 知網涉嫌壟斷?市場監管總局回應!人民網評論來了
        1閱讀 0條評論 個贊
        對此,人民網評論稱,知網屢受消費方詬病,頻惹眾怒,甚至引發涉嫌行業壟斷的質疑,按理早該深刻反思自身行為的合理性。收取高額費用、連年漲價,表面上是經營方式問題,但從深層次上看是相關企業缺乏對市場的尊重、對作者和用戶的尊重、對科學事業的尊重?!?/div>
      • 做SQL性能優化真的是干瞪眼
        1閱讀 0條評論 個贊
        很多大數據計算都是用SQL實現的,跑得慢時就要去優化SQL,但常常碰到讓人干瞪眼的情況。比如,存儲過程中有三條大概形如這樣的語句執行得很慢:selecta,b,sum(x)fromTgr……
      • 小米上市以來首次季度營收下滑:Q1凈虧損5.9億元 手機出貨量下滑22%
        1閱讀 0條評論 個贊
        小米集團第一季度報告其收入下降,原因是新冠肺炎疫情控制和封鎖影響了需求,而包括俄烏戰爭在內的更廣泛的經濟阻力增加了成本。這也是小米公司有記錄以來的首次季度收入下降。Refinitiv數據顯示,截至3月31日的季度收入從上年同期的768.8億元降至733.5億元(合108.5億美元),低于分析師預期的743億元?!?/div>
      • 關于敦促非法集資犯罪分子限期投案自首的通告
        1閱讀 0條評論 個贊
        通告稱,非法集資犯罪人員應當自本通告下發之日起30日內主動到公安機關投案自首。鼓勵未到案涉案人員親友積極規勸其盡快投案自首,經親友規勸投案的,或者親友主動報案并將涉案人員送至公安機關投案,且能夠如實供述罪行的,視為投案自首?!?/div>
      • 密集增資小貸 互聯網平臺有什么好說的?
        1閱讀 0條評論 個贊
        業內稱,今年以來監管對小貸、融擔公司等業務在收緊?!墩髑笠庖姼濉坊驎壕彸雠_,但窗口指導下,實際上一些已地方收緊,小貸必須增資且定價符合監管要求才能開展業務?!?/div>
      • 華融小金擬換帥 一把手敲定
        2閱讀 0條評論 個贊
        兩到三年內做到行業前五?……
      • 蘋果警告本季度虧損高達80億美元:Q1大中華區營收183億美元增長3.5%
        1閱讀 0條評論 個贊
        消費電子巨頭蘋果(Apple)公布了好于預期的2022財年第二財季業績,但蘋果高管警告稱,由于供應鏈短缺和中國工廠關閉等不利因素,該集團本季度可能會遭受高達80億美元的打擊,突顯出這家全球最具價值公司面臨的疫情挑戰遠未結束。周五消息傳出后,蘋果股價下跌?!?/div>
      • 實際年利率遠超承諾 收取2.3%服務費 “網銀”離不開貸款中介?
        1閱讀 0條評論 個贊
        近日,1818黃金眼報道,周老板在杭州開設備廠。一個多月前,她接到一個電話,對方問她有沒有融資需求。周老板說,當時還真的有這個需求,雙方一拍即合?,F在,她覺得自己過于輕信對方了,因為還款時發現年化利率是16.2%,當初說的是5%到5.3%?!?/div>
      • 蘇寧消費金融擬更換總經理
        3閱讀 0條評論 個贊
        整體團隊維持穩定?!?/div>
      • 福成股份財務總監失聯拒簽年報一季報 企業檢查顯示公司控制多家企業
        17閱讀 0條評論 個贊
        近日,福成股份(600965)發布一份《報警證明》公告,財務總監程靜于4月27日晚失聯,4月29日,公司方面向公安部門報警,目前公司已聯系上程靜?!?/div>
      • 職稱:本科年薪13萬 博士年薪30萬 茅臺集團再次下大力氣招聘 仍需“體能測試”
        0閱讀 0條評論 個贊
        茅臺集團又大手筆招人啦!……
      • Java原子變量中set()和lazySet()的區別
        1閱讀 0條評論 個贊
        來源|Java技術指北(ID:javanorth)在本教程中,我們將講講Javaatomic類(如AtomicInteger和AtomicReference)的方法set()和l……
      • 知乎、殼牌、李等17家公司被美國列入“預退市名單”(附部分公司回應)
        0閱讀 0條評論 個贊
        具體名單為:知乎、諾華家具、LOVARRA、萬春醫藥、瑞幸咖啡、極光移動、Scientific Energy、中國食品、Value Exchange International、澤爾西西醫療集團、Entrepreneur Universe Bright Group、中比能源、中網載線、尚乘國際、百世集團、理想汽車和貝殼。SEC聲稱,上述17家公司提交申辯的截止時間為當地時間5月12日?!?/div>
      • 開店寶財報:營收、交易規模大降超4成,母公司可能退市
        0閱讀 0條評論 個贊
        開店寶母公司亞聯發展發布2021年財報。2021年亞聯發展營收19.33億,較2020年的32.79億元,大降41.05%,歸屬于上市公司股東的凈利潤由2020年的虧損4.85億元,收窄到虧損2.73億元?!?/div>
      • MySQL批量插入數據 一次插入多少行數據效率最高?
        7閱讀 0條評論 個贊
        一、前言我們在操作大型數據表或者日志文件的時候經常會需要寫入數據到數據庫,那么最合適的方案就是數據庫的批量插入。只是我們在執行批量操作的時候,一次插入多少數據才合適呢?假如需要插入的數據有百萬條,那……
      • 《財富管理公司內部控制管理辦法》公開征求意見:29家公司獲批籌建 其中已開業25家
        1閱讀 0條評論 個贊
        2018年12月《商業銀行理財子公司管理辦法》(以下簡稱理財公司辦法)發布實施以來,共29家理財公司獲批籌建,其中25家獲批開業。截至2022年3月末,銀行及理財公司理財產品合計余額28.4萬億元。其中,理財公司產品余額17.3萬億元。理財公司作為具備獨立法人資格的新型資管機構,有必要盡快構筑全面有效的內控管理制度?!?/div>
      • 名稱:重慶金融職業技術學院獲準成立
        0閱讀 0條評論 個贊
        本月上旬,市委編辦批準重慶金融科技研究院登記成立,并發放事業單位法人證書。重慶金融科技研究院是重慶工商大學按照市委、市政府要求,以科技創新項目為抓手,全力參與科技創新中心和西部(重慶)科學城建設,在高新區管委會支持下設立的獨立法人單位,選址位于高新區科學谷?!?/div>
      • 比爾蓋茨診斷新冠肺炎有輕微癥狀:隔離直到我健康 數字工具將永遠存在
        1閱讀 0條評論 個贊
        比爾蓋茨周二表示,他的COVID-19 檢測呈陽性。這位微軟聯合創始人在 推特上宣布了他的病情,稱他“出現輕微癥狀,正在聽從專家的建議,隔離直到我再次健康為止?!薄?/div>
      • Square CEO杰克·多西改撕下總裁和CEO的標簽成為“塊頭”
        1閱讀 0條評論 個贊
        將Square Inc.更名為Block Inc.顯然只是杰克?多爾西(Jack Dorsey)的第一步。上周五下午, Block SQ向美國證券交易委員會透露,多爾西決定將自己在這家金融科技公司的頭銜從首席執行官、總裁兼主席改為“Block Head兼董事長”?!?/div>
      • 全球10大加密公司:比安列第四以太坊邊緣的比特幣
        0閱讀 0條評論 個贊
        近幾個月來,隨著通脹攀升,以及在動蕩的市場中,受歡迎的貨幣作為安全港和財富存儲設備的吸引力飆升,機構投資者對加密貨幣的興趣不斷增加。加密信息公司Chainalysis在4月20日的一份報告中披露,全球加密投資者在2021年實現了1627億美元的總收益,高于前一年的355億美元。美國以超過470億美元的收入領先其他國家,其次是英國、德國和日本?!?/div>
      最近發布資訊
      更多
      警花高潮嗷嗷叫
      <del id="nnjnj"></del><track id="nnjnj"></track>

      <p id="nnjnj"></p>

      <address id="nnjnj"></address>

        <pre id="nnjnj"><pre id="nnjnj"></pre></pre>

          <noframes id="nnjnj"><ruby id="nnjnj"><ruby id="nnjnj"></ruby></ruby>