协程
实现方式
greenlet
yield
asyncio
async & await
异步编程
事件循环
# 伪代码
任务列表 = [ 任务1,任务2,任务3,... ]
while True
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将‘可执行’和‘已完成’的任务返回
for 就绪任务 in 可执行任务列表:
执行已就绪的任务
for 已完成任务列表 in 已完成的任务列表:
在任务列表中移除已完成的任务
如果 任务列表 中的任务都已完成,则终止循环
import asyncio
# 生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到 `任务列表`
loop.run_until_complete(任务)
快速上手
协程函数,定义函数的时候 async def 函数名
协程对象,执行 协程函数() 得到的协程对象
async def func():
pass
result = func()
注意:执行协程函数创建协程对象,函数内部代码不会被执行
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。
import asyncio
async def func():
print("hello")
result = func()
# python3.7之前的写法
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
# python3.7及之后的新写法(等价于上面的写法)
asyncio.run( result )
await
await + 可等待的对象(IO等待:协程对象、Future对象、Task对象)
示例1:
import asyncio
async def func():
print("world")
response = await asyncio.sleep(2)
print("end.", response)
asyncio.run( func() )
示例2:
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "data"
async def func():
print("go...")
response = await others()
print("io请求结束,结果:", response)
asyncio.run( func() )
示例3:
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "data"
async def func():
print("go...")
response1 = await others()
print("IO请求结束,结果:", response1)
response2 = await others()
print("io请求结束,结果:", response2)
asyncio.run( func() )
await 等待得到对象的返回值之后再继续往下走
Task对象
在事件循环中添加多个任务
Task用于并发调度协程,通过asyncio.create_task(协程对象)
的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。
除了使用asyncio.create_task()
函数以外,还可以用低层级的loop.create_task()
或 asyncio.ensure_future()
函数。不建议手动实例化 Task 对象
asyncio.create_task() python3.7及之后
asyncio.ensure_future() python3.7之前
示例1:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "data"
async def main():
print("Start")
# 创建Task对象,将当前执行func函数任务添加到事件循环,创建即被执行
task1 = asyncio.create_task( func() )
task2 = asyncio.create_task( func() )
print("main end.")
# 当执行某协程遇到IO操作时,会自动切换到其他任务
# 此处await是等待相对应的协程执行完毕并获取结果
res1 = await task1
res2 = await task2
print(res1, res2)
asyncio.run( main() )
示例2:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "data"
async def main():
print("Start")
task_list = [
asyncio.create_task( func() , name="t1"), # name参数可选,相当于任务名称,默认是:Task-n
asyncio.create_task( func() , name="t2")
]
print("main end.")
# done是已完成任务的集合
# pending是在任务超出timeout之后还没完成的任务,timeout默认是None,单位s
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run( main() )
示例3:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "data"
task_list = [
func(),
func(),
]
done, pending = asyncio.run( asyncio.wait(task_list) )
print(done)
asyncio.Future对象
Task继承Future,Task对象内部await结果的处理基于Future对象来的。
它是一个更低级的接口
示例1:
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么也不干
fut = loop.create_future()
# 等待任务最终结果(Future对象),没有结果则会一直等下去
await fut
asyncio.run(main())
示例2:
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束
fut = loop.create_future()
# 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
# 即手动设置future任务的最终结果,那么fut就可以结束了。
await loop.create_task(set_after(fut))
data = await fut
print(data)
asyncio.run(main())
concurrent.futures.Future对象
使用线程池、进程池实现异步操作时用到的对象
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExcutor
def func(value):
time.sleep(1)
print(value)
return 123
# 创建线程池
pool = ThreadPoolExecutor(max_workers = 5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers = 5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
asyncio.Future
和 concurrent.futures.Future
两个对象完全没关系
这里拿出来只是为了在以后写代码可能会存在交叉使用。一部分用协程,一部分用线程池或进程池
比如:一个项目中80%都是基于协程异步编程,如果有一部分模块不支持协程(如果说MySQL不支持),则这部分使用进程或线程来做异步编程。
import time
import asyncio
import concurrent.futures
def func():
time.sleep(2)
return "Hi~"
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor (默认ThreadPoolExecutor)
# 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行fun函数,并返回一个 concurrent.futures.Future对象
# 第二步:调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asyncio.Future 对象。
# 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为 asyncio.Future 对象才能使用。
fut = loop.run_in_executor(None, func)
result = await fut
print('default thread pool', result)
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, func)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, func)
print('custom process pool', result)
asyncio.run( main() )
案例:asyncio + 不支持协程异步的模块
import asyncio
import os.path
import requests
async def download_image(url):
print("Start download: ", url)
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, requests.get, url)
response = await future
print("Download complete.")
path = os.path.join('.','pic','')
if not os.path.isdir(path):
os.mkdir(path)
file_name = url.rsplit('/')[-1]
with open(path + file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://img.tt98.com/d/file/tt98/20191218150030551/5df9c7b4ab00e.jpg',
'https://x0.ifengimg.com/res/2021/694714439025AC794ABCD5DAC1CFD567784BB506_size237_w1024_h1537.jpeg',
'https://img.tt98.com/d/file/tt98/20191218150030553/5df9c7b6b621a.jpg',
'https://p2.ssl.img.360kuai.com/t019edb8df8503757c9.jpg',
'http://image.hnol.net/c/2021-01/20/12/202101201241139537-6865224.jpg',
'https://img.zcool.cn/community/01fb1c5e02f655a80121651844aeb4.jpg@1280w_1l_2o_100sh.jpg',
'http://img.mm4000.com/file/9/e2/c346513bf0.jpg',
'https://img.tt98.com/d/file/tt98/20200430180952214/f6fa506341.jpg',
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
异步迭代器
什么是迭代器
python中,任意对象,只要定义了__next__
方法,它就是一个迭代器。
python中的容器如列表、元组、字典、集合、字符串都可以被称作迭代器
迭代就是从迭代器中取元素的过程
比如用for循环从列表[1,2,3]中取元素,这种遍历过程就被称作迭代
如果你不想用for循环迭代呢?
这时你可以:
- 先调用容器(以字符串为例)的iter()函数
- 再使用 next() 内置函数来调用 next() 方法
- 当元素用尽时,next() 将引发 StopIteration 异常
什么是生成器
通过列表生成式,我们可以直接创建一个列表。
但是,受到内存限制,列表容量肯定是有限的。
而且,创建一个包含100万个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了。
所以,如果列表元素可以按照某种算法推算出来,那我们是否可以在循环的过程中不断推算出后续的元素呢?
这样就不必创建完整的list,从而节省大量的空间。在Python中,这种一边循环一边计算的机制,称为生成器(Generator)。
生成器也是一种迭代器,但是你只能对其迭代一次。
# 生成一个列表
list_1 = [i for i in range(10)] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 生成一个生成器
list_2 = (i for i in range) # <generator object <genexpr> at 0x10d47a120>
next(list_2) # 0
next(list_2) # 1
next(list_2) # 2
next(list_2) # 3
使用装饰器的方式对两种生成方式的生成时间做下测试
import time
def log_func_time(func):
def wrapper():
start_time = time.perf_counter()
my_func = func()
end_time = time.perf_counter()
print('方法 {} 消耗了 {} ms'.format(func.__name__, (end_time - start_time) * 1000))
return my_func
return wrapper
@log_func_time
def calculate_func_1():
list_1 = [i for i in range(66666666)]
@log_func_time
def calculate_func_2():
list_2 = (i for i in range(66666666))
calculate_func_1()
calculate_func_2()
# 方法 calculate_func_1 消耗了 6139.225361000001 ms
# 方法 calculate_func_2 消耗了 0.10221299999990663 ms
生成器生成列表list_1的时候是固定的
生成器生成list_2的时候是去记录了一定的算法规则,不需要把全部数据全部记录下来,等到我们需要的时候去调用它,这就是生成器(根据一定规律的算法生成的)
边运行边推算出结果,从而节省空间
yield
yield 和 return 有点相似
一个函数里面如果被定义了yield那么这个函数就是一个生成器
def foo():
print('111111')
yield
print('222222')
yield
print('333333')
yield
print('444444')
f = foo()
print(type(f)) # <class 'generator'>
next(f) # 111111
next(f) # 222222
next(f) # 333333
next(f) # 444444
next(f) # Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# StopIteration
def foo():
yield 1
yield 22
yield 333
f = foo()
for i in f:
print(i)
# 1
# 22
# 333
迭代器:在内部实现了__iter__()
方法和__next__()
方法的对象
可迭代对象:在它的类里面实现了__iter__()
方法并且返回了迭代器
什么是异步迭代器
实现了 __aiter__()
和 __anext__()
方法的对象。__anext__
必须返回一个 awaitable
对象。async_for
会处理异步迭代器的 __anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。
什么是异步可迭代对象
可在async_for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator
(异步迭代器)。
import asyncio
class Reader(object):
""" 自定义异步迭代器(同时也是异步可迭代对象) """
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def main():
obj = Reader()
# async for 语句必须写在协程函数内
async for item in obj:
print(item)
if __name__ == '__main__':
asyncio.run(main())
异步上下文管理器
此种对象通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。
import asyncio
class AsyncContextManager:
def __init__(self):
print('init')
# self.conn = conn
async def do_something(self):
# 异步操作数据库
return 666
async def __aenter__(self):
# 异步连接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# 异步关闭数据库连接
await asyncio.sleep(1)
async def func():
# async with 必须写在协程函数内
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
if __file__ == '__main__':
asyncio.run(func())
uvloop
是 asyncio 的事件循环的替代方案。uvloop事件循环效率 大于 默认asyncio的事件循环
pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写 asyncio 的代码与之前的一致
# 内部的事件循环自动会变为uvloop
asyncio.run(...)
注意:一个asgi:uvicorn
内部使用的就是 uvloop
异步案例
异步Redis
在使用python代码操作redis时,连接/操作/断开 都是网络IO。
pip3 install aioredis
示例1:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aioredis
async def execute(address, password):
print("开始执行", address)
# 网络IO:创建redis连接
redis = await aioredis.create_redis(address, password=password)
# 网络IO:在redis中设置hash值car
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO:去redis获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close
# 网络IO:关闭redis连接
await redis.wait_closed()
print("结束", address)
asyncio.run( execute('redis://127.0.0.1:6379', 'password') )
示例2:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aioredis
async def execute(address, password):
print("开始执行", address)
# 网络IO:创建redis连接
redis = await aioredis.create_redis_pool(address, password=password)
# 网络IO:在redis中设置hash值car
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO:去redis获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close
# 网络IO:关闭redis连接
await redis.wait_closed()
print("结束", address)
task_list = [
execute('redis://192.168.11:6379', 'password'),
execute('redis://192.168.12:6379', 'password')
]
asyncio.run( asyncio.wait(task_list) )
异步MySQL
pip3 install aiomysql
示例1:
import asyncio
import aiomysql
async def execute():
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
cur = await conn.cursor()
await cur.execute("select Host,User from user")
result = await cur.fetchall()
print(result)
await cur.close()
conn.close()
print("结束", host)
asyncio.run( execute() )
示例2:
import asyncio
import aiomysql
async def execute():
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
cur = await conn.cursor()
await cur.execute("select Host,User from user")
result = await cur.fetchall()
print(result)
await cur.close()
conn.close()
print("结束", host)
task_list = [
execute("192.168.1.11", "root123"),
execute("192.168.1.12", "root123"),
]
asyncio.run( asyncio.wait(task_list) )
FastAPI框架
pip3 install fastapi
pip3 install uvicorn
示例:
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
# 创建一个redis连接池
REDIS_POOL = aioredis.ConnectionsPool('redis://127.0.0.1:6379', password="root123", minsize=1, maxsize=10)
@app.get("/")
def index():
""" 普通操作接口 """
# 某个IO操作10s
return {"message": "Hello World"}
@app.get("/red")
async def red():
""" 异步操作接口 """
print("request start.")
await asyncio.sleep(3)
# 从连接池中获取一个连接
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
# 写入
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 读取
result = await redis.hgetall('car', encoding='utf-8')
print(result)
# 连接归还给连接池
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
# luffy:app lyffy 脚本名字,app FastAPI实例名称
uvicorn().run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
爬虫
pip3 install aiohttp
import aiohttp
import asyncio
async def fetch(session, url):
print("request start: ", url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print("get result: ", url, len(text))
return text
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://python.org',
'https://www.baidu.com',
'https://www.google.com'
]
tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list ]
done, pending = await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run( main() )
学习来源:哔哩哔哩_bilibili