Python~协程

协程

实现方式

greenlet

yield

asyncio

async & await

异步编程

事件循环

 # 伪代码
 ​
 任务列表 = [ 任务1,任务2,任务3,... ]
 ​
 while True
     可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将‘可执行’和‘已完成’的任务返回
   
   for 就绪任务 in 可执行任务列表:
     执行已就绪的任务
     
     for 已完成任务列表 in 已完成的任务列表:
     在任务列表中移除已完成的任务
     
   如果 任务列表 中的任务都已完成,则终止循环
 import asyncio
 ​
 # 生成或获取一个事件循环
 loop = asyncio.get_event_loop()
 ​
 # 将任务放到 `任务列表`
 loop.run_until_complete(任务)

快速上手

协程函数,定义函数的时候 async def 函数名

协程对象,执行 协程函数() 得到的协程对象

 async def func():
  pass
 ​
 result = func()

注意:执行协程函数创建协程对象,函数内部代码不会被执行

如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。

 import asyncio
 ​
 async def func():
  print("hello")
 ​
 result = func()
 ​
 # python3.7之前的写法
 loop = asyncio.get_event_loop()
 loop.run_until_complete(result)
 ​
 # python3.7及之后的新写法(等价于上面的写法)
 asyncio.run( result )

await

await + 可等待的对象(IO等待:协程对象、Future对象、Task对象)

示例1:

 import asyncio
 ​
 async def func():
   print("world")
   response = await asyncio.sleep(2)
   print("end.", response)
   
 asyncio.run( func() )

示例2:

 import asyncio
 ​
 async def others():
   print("start")
   await asyncio.sleep(2)
   print("end")
   return "data"
 ​
 async def func():
   print("go...")
   
   response = await others()
   print("io请求结束,结果:", response)
   
 asyncio.run( func() )

示例3:

 import asyncio
 ​
 async def others():
   print("start")
   await asyncio.sleep(2)
   print("end")
   return "data"
 ​
 async def func():
   print("go...")
   
   response1 = await others()
   print("IO请求结束,结果:", response1)
   
   response2 = await others()
   print("io请求结束,结果:", response2)
   
 asyncio.run( func() )

await 等待得到对象的返回值之后再继续往下走

Task对象

在事件循环中添加多个任务

Task用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。

除了使用asyncio.create_task()函数以外,还可以用低层级的loop.create_task()asyncio.ensure_future() 函数。不建议手动实例化 Task 对象

asyncio.create_task() python3.7及之后

asyncio.ensure_future() python3.7之前

示例1:

 import asyncio
 ​
 async def func():
   print(1)
   await asyncio.sleep(2)
   print(2)
   return "data"
 ​
 async def main():
   print("Start")
   
   # 创建Task对象,将当前执行func函数任务添加到事件循环,创建即被执行
   task1 = asyncio.create_task( func() )
   task2 = asyncio.create_task( func() )
   
   print("main end.")
   
   # 当执行某协程遇到IO操作时,会自动切换到其他任务
   # 此处await是等待相对应的协程执行完毕并获取结果
   res1 = await task1
   res2 = await task2
   print(res1, res2)
   
 asyncio.run( main() )

示例2:

 import asyncio
 ​
 async def func():
   print(1)
   await asyncio.sleep(2)
   print(2)
   return "data"
 ​
 async def main():
   print("Start")
   
   task_list = [
     asyncio.create_task( func() , name="t1"), # name参数可选,相当于任务名称,默认是:Task-n
     asyncio.create_task( func() , name="t2")
  ]
   
   print("main end.")
   
   # done是已完成任务的集合
   # pending是在任务超出timeout之后还没完成的任务,timeout默认是None,单位s
   done, pending = await asyncio.wait(task_list, timeout=None)
   print(done)
 ​
 asyncio.run( main() )

示例3:

 import asyncio
 ​
 async def func():
   print(1)
   await asyncio.sleep(2)
   print(2)
   return "data"
 ​
 ​
 task_list = [
  func(),
  func(),
 ]
   
 done, pending = asyncio.run( asyncio.wait(task_list) )
 print(done)

asyncio.Future对象

Task继承Future,Task对象内部await结果的处理基于Future对象来的。

它是一个更低级的接口

示例1:

 async def main():
   # 获取当前事件循环
   loop = asyncio.get_running_loop()
   
   # 创建一个任务(Future对象),这个任务什么也不干
   fut = loop.create_future()
   
   # 等待任务最终结果(Future对象),没有结果则会一直等下去
   await fut
   
 asyncio.run(main())

示例2:

 import asyncio
 ​
 async def set_after(fut):
   await asyncio.sleep(2)
   fut.set_result("666")
   
 async def main():
  # 获取当前事件循环
   loop = asyncio.get_running_loop()
   
   # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束
   fut = loop.create_future()
   
   # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
   # 即手动设置future任务的最终结果,那么fut就可以结束了。
   await loop.create_task(set_after(fut))
   
   data = await fut
   print(data)
 
 asyncio.run(main())

concurrent.futures.Future对象

使用线程池、进程池实现异步操作时用到的对象

 import time
 from concurrent.futures import Future
 from concurrent.futures.thread import ThreadPoolExecutor
 from concurrent.futures.process import ProcessPoolExcutor
   
 def func(value):
   time.sleep(1)
   print(value)
   return 123
   
 # 创建线程池
 pool = ThreadPoolExecutor(max_workers = 5)
 ​
 # 创建进程池
 # pool = ProcessPoolExecutor(max_workers = 5)
 ​
 for i in range(10):
   fut = pool.submit(func, i)
   print(fut)

asyncio.Futureconcurrent.futures.Future 两个对象完全没关系

这里拿出来只是为了在以后写代码可能会存在交叉使用。一部分用协程,一部分用线程池或进程池

比如:一个项目中80%都是基于协程异步编程,如果有一部分模块不支持协程(如果说MySQL不支持),则这部分使用进程或线程来做异步编程。

 import time
 import asyncio
 import concurrent.futures
 ​
 def func():
   time.sleep(2)
   return "Hi~"
 ​
 async def main():
   loop = asyncio.get_running_loop()
   
  # 1. Run in the default loop's executor (默认ThreadPoolExecutor)
  # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行fun函数,并返回一个 concurrent.futures.Future对象
  # 第二步:调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asyncio.Future 对象。
  # 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为 asyncio.Future 对象才能使用。
   fut = loop.run_in_executor(None, func)
   result = await fut
   print('default thread pool', result)
   
   # 2. Run in a custom thread pool:
   with concurrent.futures.ThreadPoolExecutor() as pool:
     result = await loop.run_in_executor(pool, func)
     print('custom thread pool', result)
 ​
  # 3. Run in a custom process pool:
   with concurrent.futures.ProcessPoolExecutor() as pool:
  result = await loop.run_in_executor(pool, func)
  print('custom process pool', result)
   
 asyncio.run( main() )

案例:asyncio + 不支持协程异步的模块

 import asyncio
 import os.path
 ​
 import requests
 ​
 ​
 async def download_image(url):
     print("Start download: ", url)
 ​
     loop = asyncio.get_event_loop()
     future = loop.run_in_executor(None, requests.get, url)
 ​
     response = await future
     print("Download complete.")
 ​
     path = os.path.join('.','pic','')
     if not os.path.isdir(path):
         os.mkdir(path)
 ​
     file_name = url.rsplit('/')[-1]
     with open(path + file_name, mode='wb') as file_object:
         file_object.write(response.content)
 ​
 ​
 if __name__ == '__main__':
     url_list = [
         'https://img.tt98.com/d/file/tt98/20191218150030551/5df9c7b4ab00e.jpg',
         'https://x0.ifengimg.com/res/2021/694714439025AC794ABCD5DAC1CFD567784BB506_size237_w1024_h1537.jpeg',
         'https://img.tt98.com/d/file/tt98/20191218150030553/5df9c7b6b621a.jpg',
         'https://p2.ssl.img.360kuai.com/t019edb8df8503757c9.jpg',
         'http://image.hnol.net/c/2021-01/20/12/202101201241139537-6865224.jpg',
         'https://img.zcool.cn/community/01fb1c5e02f655a80121651844aeb4.jpg@1280w_1l_2o_100sh.jpg',
         'http://img.mm4000.com/file/9/e2/c346513bf0.jpg',
         'https://img.tt98.com/d/file/tt98/20200430180952214/f6fa506341.jpg',
    ]
 ​
     tasks = [download_image(url) for url in url_list]
     loop = asyncio.get_event_loop()
     loop.run_until_complete(asyncio.wait(tasks))

异步迭代器

什么是迭代器

python中,任意对象,只要定义了__next__方法,它就是一个迭代器。

python中的容器如列表、元组、字典、集合、字符串都可以被称作迭代器

迭代就是从迭代器中取元素的过程

比如用for循环从列表[1,2,3]中取元素,这种遍历过程就被称作迭代

如果你不想用for循环迭代呢?

这时你可以:

  1. 先调用容器(以字符串为例)的iter()函数
  2. 再使用 next() 内置函数来调用 next() 方法
  3. 当元素用尽时,next() 将引发 StopIteration 异常

什么是生成器

通过列表生成式,我们可以直接创建一个列表。

但是,受到内存限制,列表容量肯定是有限的。

而且,创建一个包含100万个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了。

所以,如果列表元素可以按照某种算法推算出来,那我们是否可以在循环的过程中不断推算出后续的元素呢?

这样就不必创建完整的list,从而节省大量的空间。在Python中,这种一边循环一边计算的机制,称为生成器(Generator)。

生成器也是一种迭代器,但是你只能对其迭代一次。

 # 生成一个列表
 list_1 = [i for i in range(10)] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 ​
 # 生成一个生成器
 list_2 = (i for i in range) # <generator object <genexpr> at 0x10d47a120>
 next(list_2) # 0
 next(list_2) # 1
 next(list_2) # 2
 next(list_2) # 3

使用装饰器的方式对两种生成方式的生成时间做下测试

 import time
 ​
 def log_func_time(func):
 def wrapper():
  start_time = time.perf_counter()
  my_func = func()
  end_time = time.perf_counter()
  print('方法 {} 消耗了 {} ms'.format(func.__name__, (end_time - start_time) * 1000))
  return my_func
 ​
 return wrapper
 ​
 @log_func_time
 def calculate_func_1():
 list_1 = [i for i in range(66666666)]
 ​
 @log_func_time
 def calculate_func_2():
 list_2 = (i for i in range(66666666))
 ​
 calculate_func_1()
 calculate_func_2()
 ​
 # 方法 calculate_func_1 消耗了 6139.225361000001 ms
 # 方法 calculate_func_2 消耗了 0.10221299999990663 ms

生成器生成列表list_1的时候是固定的

生成器生成list_2的时候是去记录了一定的算法规则,不需要把全部数据全部记录下来,等到我们需要的时候去调用它,这就是生成器(根据一定规律的算法生成的)

边运行边推算出结果,从而节省空间

yield

yield 和 return 有点相似

一个函数里面如果被定义了yield那么这个函数就是一个生成器

 def foo():
   print('111111')
   yield
   print('222222')
   yield
   print('333333')
   yield
   print('444444')
 ​
 f = foo()
 print(type(f)) # <class 'generator'>
 ​
 next(f) # 111111
 next(f) # 222222
 next(f) # 333333
 next(f) # 444444
 next(f) # Traceback (most recent call last):
  # File "<stdin>", line 1, in <module>
  # StopIteration
 def foo():
   yield 1
   yield 22
   yield 333
 ​
 f = foo()
 ​
 for i in f:
   print(i)  
   # 1
   # 22
   # 333

迭代器:在内部实现了__iter__()方法和__next__()方法的对象

可迭代对象:在它的类里面实现了__iter__()方法并且返回了迭代器

什么是异步迭代器

实现了 __aiter__()__anext__() 方法的对象。__anext__ 必须返回一个 awaitable 对象。async_for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。

什么是异步可迭代对象

可在async_for语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator(异步迭代器)。

 import asyncio
 ​
 class Reader(object):
     """ 自定义异步迭代器(同时也是异步可迭代对象) """
 ​
     def __init__(self):
         self.count = 0
 ​
     async def readline(self):
         self.count += 1
         if self.count == 100:
             return None
         return self.count
 ​
     def __aiter__(self):
         return self
 ​
     async def __anext__(self):
         val = await self.readline()
         if val == None:
             raise StopAsyncIteration
         return val
 ​
 ​
 async def main():
     obj = Reader()
     # async for 语句必须写在协程函数内
     async for item in obj:
         print(item)
 ​
 ​
 if __name__ == '__main__':
     asyncio.run(main())

异步上下文管理器

此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。

 import asyncio
 ​
 class AsyncContextManager:
     def __init__(self):
         print('init')
         # self.conn = conn
 ​
     async def do_something(self):
         # 异步操作数据库
         return 666
 ​
     async def __aenter__(self):
         # 异步连接数据库
         self.conn = await asyncio.sleep(1)
         return self
 ​
     async def __aexit__(self, exc_type, exc, tb):
         # 异步关闭数据库连接
         await asyncio.sleep(1)
 ​
 async def func():
     # async with 必须写在协程函数内
     async with AsyncContextManager() as f:
         result = await f.do_something()
         print(result)
 ​
 if __file__ == '__main__':
     asyncio.run(func())

uvloop

是 asyncio 的事件循环的替代方案。uvloop事件循环效率 大于 默认asyncio的事件循环

 pip3 install uvloop
 import asyncio
 import uvloop
 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
 ​
 # 编写 asyncio 的代码与之前的一致
 ​
 # 内部的事件循环自动会变为uvloop
 asyncio.run(...)

注意:一个asgi:uvicorn 内部使用的就是 uvloop

异步案例

异步Redis

在使用python代码操作redis时,连接/操作/断开 都是网络IO。

 pip3 install aioredis

示例1:

 #!/usr/bin/env python
 # -*- coding:utf-8 -*-
 import asyncio
 import aioredis
 ​
 async def execute(address, password):
   print("开始执行", address)
   # 网络IO:创建redis连接
   redis = await aioredis.create_redis(address, password=password)
   
   # 网络IO:在redis中设置hash值car
   await redis.hmset_dict('car', key1=1, key2=2, key3=3)
   
   # 网络IO:去redis获取值
   result = await redis.hgetall('car', encoding='utf-8')
   print(result)
   
   redis.close
   # 网络IO:关闭redis连接
   await redis.wait_closed()
   
   print("结束", address)
   
 asyncio.run( execute('redis://127.0.0.1:6379', 'password') )

示例2:

 #!/usr/bin/env python
 # -*- coding:utf-8 -*-
 import asyncio
 import aioredis
 ​
 async def execute(address, password):
   print("开始执行", address)
   # 网络IO:创建redis连接
   redis = await aioredis.create_redis_pool(address, password=password)
   
   # 网络IO:在redis中设置hash值car
   await redis.hmset_dict('car', key1=1, key2=2, key3=3)
   
   # 网络IO:去redis获取值
   result = await redis.hgetall('car', encoding='utf-8')
   print(result)
   
   redis.close
   # 网络IO:关闭redis连接
   await redis.wait_closed()
   
   print("结束", address)
   
 task_list = [
   execute('redis://192.168.11:6379', 'password'),
   execute('redis://192.168.12:6379', 'password')
 ]
 ​
 asyncio.run( asyncio.wait(task_list) )

异步MySQL

 pip3 install aiomysql

示例1:

 import asyncio
 import aiomysql
 ​
 async def execute():
   conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
   
   cur = await conn.cursor()
   
   await cur.execute("select Host,User from user")
   
   result = await cur.fetchall()
   print(result)
   
   await cur.close()
   conn.close()
   print("结束", host)
   
 asyncio.run( execute() )

示例2:

 import asyncio
 import aiomysql
 ​
 async def execute():
   conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
   
   cur = await conn.cursor()
   
   await cur.execute("select Host,User from user")
   
   result = await cur.fetchall()
   print(result)
   
   await cur.close()
   conn.close()
   print("结束", host)
   
 task_list = [
   execute("192.168.1.11", "root123"),
   execute("192.168.1.12", "root123"),
 ]
 asyncio.run( asyncio.wait(task_list) )

FastAPI框架

 pip3 install fastapi
 pip3 install uvicorn

示例:

 import asyncio
 ​
 import uvicorn
 import aioredis
 from aioredis import Redis
 from fastapi import FastAPI
 ​
 ​
 app = FastAPI()
 ​
 # 创建一个redis连接池
 REDIS_POOL = aioredis.ConnectionsPool('redis://127.0.0.1:6379', password="root123", minsize=1, maxsize=10)
 ​
 @app.get("/")
 def index():
   """ 普通操作接口 """
   # 某个IO操作10s
   return {"message": "Hello World"}
 ​
 @app.get("/red")
 async def red():
   """ 异步操作接口 """
   print("request start.")
   
   await asyncio.sleep(3)
  # 从连接池中获取一个连接
   conn = await REDIS_POOL.acquire()
   redis = Redis(conn)
   
   # 写入
   await redis.hmset_dict('car', key1=1, key2=2, key3=3)
   
   # 读取
   result = await redis.hgetall('car', encoding='utf-8')
   print(result)
   
   # 连接归还给连接池
   REDIS_POOL.release(conn)
   
   return result
   
 if __name__ == '__main__':
   # luffy:app   lyffy 脚本名字,app FastAPI实例名称
   uvicorn().run("luffy:app", host="127.0.0.1", port=5000, log_level="info")

爬虫

 pip3 install aiohttp
 import aiohttp
 import asyncio
 ​
 async def fetch(session, url):
   print("request start: ", url)
   async with session.get(url, verify_ssl=False) as response:
     text = await response.text()
     print("get result: ", url, len(text))
     return text
   
 async def main():
   async with aiohttp.ClientSession() as session:
     url_list = [
       'https://python.org',
       'https://www.baidu.com',
       'https://www.google.com'
     ]
     tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list ]
     
     done, pending = await asyncio.wait(tasks)
     
 if __name__ == '__main__':
   asyncio.run( main() )

学习来源:哔哩哔哩_bilibili

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇