python异步编程
python的异步机制在2.7以后才陆续被官方引入,而且到目前为止,仍然没有停止编程范式的修改,所以在网上和python书籍上能看到不同版本的写法,以及查看官方文档。
官方文档
相关语法
- 协程通过
async/await
语法进行声明,是编写异步应用的推荐方式。例如,以下代码段 (需要 Python 3.7+) 打印 “hello”,等待 1 秒,然后打印 “world”:
1 2 3 4 5 6 7 8
| import asyncio
async def main(): print('hello') await asyncio.sleep(1) print('world') asyncio.run(main())
|
async
async
用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是 sleep(5)
)消失后,也就是5秒到了再回来执行。- 协程函数: 定义形式为
async def
的函数; - 协程对象: 调用 协程函数 所返回的对象。
await
await
用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。
如果一个对象可以在 await
语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。
可等待对象有三种主要类型: 协程, 任务 和 Future.
asyncio.run()
asyncio.run()
函数用来运行最高层级的入口点 “main()” 函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import asyncio import time
async def say_after(delay, what): await asyncio.sleep(delay) print(what)
async def main(): print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello') await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
|
- 等待一个协程。代码段会在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world”:
1 2 3 4
| started at 17:13:52 hello world finished at 17:13:55
|
asyncio.create_task()
asyncio.create_task()
函数用来并发运行作为 asyncio
任务的多个协程。- 当一个协程通过
asyncio.create_task()
等函数被打包为一个任务,该协程将自动排入日程准备立即运行。
1 2 3 4 5 6 7 8 9
| async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") await task1 await task2 print(f"finished at {time.strftime('%X')}")
|
1 2 3 4
| started at 17:14:32 hello world finished at 17:14:34
|
经典‘生产者消费者’模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class Mode(object): def __init__(self,maxsize): self.num_workers=maxsize self.queue=None async def producer(self): print('生产者: 开始') for i in range(self.num_workers * 10): await self.queue.put(i) print(f'生产者: 添加任务 {i} 到队列') print('生产者: 准备关门了') for i in range(self.num_workers): await self.queue.put(None) print('生产者: 等待队列清空') await self.queue.join() print('生产者: 结束') async def consumer(self,n): print(f'消费者{n}号: 开始') while True: print(f'消费者 {n}: 等待') item = await self.queue.get() print(f'消费者 {n}: 获取到了 {item}') if item is None: self.queue.task_done() break else: await asyncio.sleep(0.1 * item) self.queue.task_done() print(f'消费者{n}号: 结束') async def run(self): self.queue=asyncio.Queue(maxsize=self.maxsize) consumers = [ asyncio.create_task(self.consumer(i)) for i in range(self.num_workers) ] gather = consumers+[asyncio.create_task(self.producer())] await asyncio.gather(*gather)
if __name__ == "__main__": mode=Mode(5) asyncio.run(mode.run())
|