目录
  1. python异步编程
  2. 相关语法
    1. async
    2. await
    3. asyncio.run()
    4. asyncio.create_task()
  3. 经典‘生产者消费者’模型
python异步机制

python异步编程

python的异步机制在2.7以后才陆续被官方引入,而且到目前为止,仍然没有停止编程范式的修改,所以在网上和python书籍上能看到不同版本的写法,以及查看官方文档。

官方文档

相关语法

  • 协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。例如,以下代码段 (需要 Python 3.7+) 打印 “hello”,等待 1 秒,然后打印 “world”:
1
2
3
4
5
6
7
8
import asyncio

async def main():
print('hello')
await asyncio.sleep(1)
print('world')

asyncio.run(main())
  • 输出:
1
2
hello
world

async

  • async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是 sleep(5) )消失后,也就是5秒到了再回来执行。
    • 协程函数: 定义形式为 async def 的函数;
    • 协程对象: 调用 协程函数 所返回的对象。

await

await 用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象

可等待对象有三种主要类型: 协程, 任务 和 Future.

asyncio.run()

asyncio.run() 函数用来运行最高层级的入口点 “main()” 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())
  • 等待一个协程。代码段会在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world”:
1
2
3
4
started at 17:13:52
hello
world
finished at 17:13:55

asyncio.create_task()

  • asyncio.create_task() 函数用来并发运行作为 asyncio 任务的多个协程。
  • 当一个协程通过 asyncio.create_task() 等函数被打包为一个任务,该协程将自动排入日程准备立即运行。
1
2
3
4
5
6
7
8
9
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
await task1
await task2
print(f"finished at {time.strftime('%X')}")
  • 输出显示代码段的运行时间比之前快了 1 秒:
1
2
3
4
started at 17:14:32
hello
world
finished at 17:14:34

经典‘生产者消费者’模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Mode(object):
def __init__(self,maxsize):
self.num_workers=maxsize
self.queue=None
async def producer(self):
print('生产者: 开始')
for i in range(self.num_workers * 10):
await self.queue.put(i)
print(f'生产者: 添加任务 {i} 到队列')
print('生产者: 准备关门了')
for i in range(self.num_workers):
await self.queue.put(None)
print('生产者: 等待队列清空')
await self.queue.join()
print('生产者: 结束')
async def consumer(self,n):
print(f'消费者{n}号: 开始')
while True:
print(f'消费者 {n}: 等待')
item = await self.queue.get()
print(f'消费者 {n}: 获取到了 {item}')
if item is None:
self.queue.task_done()
break
else:
await asyncio.sleep(0.1 * item)
self.queue.task_done()
print(f'消费者{n}号: 结束')
async def run(self):
self.queue=asyncio.Queue(maxsize=self.maxsize)
consumers = [
asyncio.create_task(self.consumer(i)) for i in range(self.num_workers)
]
gather = consumers+[asyncio.create_task(self.producer())]
await asyncio.gather(*gather)

if __name__ == "__main__":
mode=Mode(5)
asyncio.run(mode.run())
文章作者: Haibei
文章链接: http://www.haibei.online/posts/3510155220.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Haibei的博客
打赏
  • 微信
  • 支付宝

评论