cover

Python协程入门

本文发布之时,Python 3.9已经停止支持,因为没有考虑3.9及之前版本的语法兼容性,请使用Python 3.10及以上版本运行示例代码。

引言

并发 (Concurrency) ,是计算机领域最重要的概念之一,它指的是多个任务在一段时间间隔内同时执行。实现并发有两种常见途径:多个CPU同时工作(此时一般称为并行,但是在广义上也属于并发的一种);单个CPU,但是多个任务在一段时间内交替执行。

对于计算密集型任务,想要提升效率除了并行之外别无它法。但是对于不那么计算密集的任务,则可以通过合理 调度让这些任务交替执行,在任务A不需要CPU的时候,就让任务A让出CPU的使用权,转而让任务B执行。这样就可以提高CPU的利用率与整个系统的任务吞吐率。

通常语境下,我们说并发的时候是指第二种途径。因为第一种途径需要解决的是数据一致性、数据并行度等问题,一般会直接用并行(Parallelism)这个术语。而第二种途径的关注点则在于——调度

想象一个场景:你是一名老师,在进行课程答疑,每个学生都有一个问题要询问你,对于每个学生,你都需要做3个步骤:告诉他解题思路;让他自己完成题目;你检查他的答案是否正确。在这个场景下,合理的做法显然是:先告诉学生A做题思路,然后让他自己完成题目,在他做题的时候,你可以向学生B讲解思路,而不必傻等学生A做题,等学生A做完再去检查即可。

这里的老师就相当于CPU,计算就相当于讲解解题思路和检查答案(都需要占用CPU),学生完成题目则是不需要CPU参与的环节(IO操作)。

一切看上去似乎很美好。

但是物理世界中很多随手可以解决的问题,在计算机中都需要进行精密设计:

  • 学生做完了如何让我得知?是我一个个问,还是让他们主动告诉我?
  • 如果是一个个问,什么时机去问?
  • 如果是学生主动告诉我,学生A做完让我检查的时候,我正在给学生B讲思路,此时是打断讲解还是让学生A先等着?
  • 学生C的题目非常复杂,我需要讲解很久,是一直给C讲让别人等着还是讲到一半先给其他人讲?

这些都是调度者需要考虑的问题。

本文的重点不是探究这些问题是如何解决或权衡的,举这个例子只是想明确一个要点:并发系统中一定存在一个调度器,这个调度器按照一定的机制对各个任务进行调度,以便它们高效地交替运行。

在应用层面,我们只需要向这个调度器提交任务并等待任务完成,调度器的原理只需要大概了解即可。

线程与协程

在主流操作系统中,任务被抽象成线程(Thread),操作系统中的调度器对线程进行调度。当一个线程进行IO操作(文件读写、网络请求等)时,调度器会将这个线程放到阻塞队列中,转而去执行其它线程,当IO操作完成时,再将这个线程放入就绪队列,等待下次执行。

在每次线程切换时,操作系统都要进行备份现场和恢复现场的操作(主要是各种寄存器),会引入额外的开销。对于后台应用,如果在处理用户请求时需要进行数据库读写等网络IO操作,那么就需要为每个请求开辟一个线程(不然请求A的IO操作阻塞了整个线程,那么这个线程中的其它请求也只能被迫挂起)。很容易想到,请求越多,线程数就越多,线程切换次数也越多,额外开销更多,请求多到一定程度,就会撑爆服务器。

换句话说,线程不够轻量,创建线程和切换线程的代价都比较大。因此协程应运而生。

协程(Coroutine)也是对任务的抽象。

仿照OS调度器的思路,在应用程序层面实现一个协程调度器,这个调度器可以调度协程,在协程切换时,不需要OS介入,就不会产生线程切换的开销,同时一个请求只需要一个协程就够了,也可以减少OS中的线程数量。

在很多编程语言中,都把这个协程调度器称为事件循环(Event Loop)。

如果你足够敏锐,立刻就会发现一个关键问题:IO总是存在的,在协程中进行IO的时候,不还是把整个线程阻塞了吗?这样事件循环本身都被挂起了,还怎么调度别的协程呢?

为了解决这个问题,主流操作系统都引入了非阻塞式IO的概念。线程之所以会被阻塞,是因为OS的调度器为了不浪费CPU主动把线程挂起的,只要在IO时告诉OS“我要进行IO,但是不要挂起我,我还有其它事要干”,OS就不会把这个线程挂起,这就是非阻塞式IO。

那普通的线程为什么不这么干呢?因为它没有事件循环,进行IO时它除了等没有别的事情干,保留CPU使用权也没有用;而事件循环则可以将当前进行IO的协程挂起,转而去执行其它的协程,不会浪费CPU。

 提示

在现代OS架构中,所有的IO操作都要通过OS提供的系统调用进行,系统调用会让OS会获得控制权,进而进行线程调度。

所谓控制权,就是CPU当前在执行谁的代码。

除了非阻塞式IO,使用协程还需要一个先决条件:IO多路复用。顾名思义,就是它可以让一个线程监听多个IO行为。这样一个线程内的多个协程才可以任意发起IO而不产生冲突。复用这个词,指的是IO行为复用线程。如果不复用,每个IO行为都需要一个线程进行监听。

所谓异步编程,就是有意识的在应用程序层面使用协程进行并发,不只依赖于OS在背后帮你用线程进行并发。

 重要

在编写异步代码时,千万不要在协程中同步IO,这会导致整个线程被挂起,整个事件循环都会被停掉。对并发要求不是特别高的时候,同步打印log一般可以接受,因为整个打印过程很短,但还是推荐使用日志队列,将具体的打印动作委托给其它线程执行。

协程对象与Task对象

Python官方的异步编程框架是asyncio,它的思路与前面讲的基本一致:事件循环调度Task,每个Task都与一个协程对象关联,一个协程对象类似一段函数代码,事件循环驱动Task执行时,就是在执行这些代码,并把执行结果(返回值)放到Task对象中。

虽然事件循环直接调度的不是协程,但是本质并无差别。

要得到协程对象,需要先定义协程函数

 1async def delay_add(x: int, y: int, delay: int = 1) -> int:
 2    print(f"adding {x} and {y}")
 3    return x + y
 4
 5
 6print(type(delay_add))
 7
 8coro = delay_add(1, 2)
 9
10print(type(coro))

运行代码,会得到下面的结果:

1<class 'function'>
2<class 'coroutine'>
3<sys>:0: RuntimeWarning: coroutine 'delay_add' was never awaited

这里的delay_add就是一个协程函数(本质上还是函数,所以type是function),调用这种函数时,不会执行函数内的代码,而是直接返回了一个协程对象。

因为协程一直没有被驱动执行整个程序就结束了,所以Python raise了一个RuntimeWarning告知你这一情况。不执行代码一般不会产生破坏性结果,所以只是一个Warning而非Error。

那怎么驱动协程执行内部的代码?很简单,创建一个Task让事件循环调度这个Task就行了:

 1import asyncio
 2
 3
 4async def delay_add(x: int, y: int, delay: int = 1) -> int:
 5    print(f"adding {x} and {y}")
 6    return x + y
 7
 8
 9loop = asyncio.new_event_loop()
10task = loop.create_task(delay_add(1, 2))
11loop.run_until_complete(task)
12print(task.result())
13loop.close()

这段代码很好理解,创建事件循环、创建任务、调度任务、获取任务执行结果、关闭事件循环。

唯一需要注意的是,创建任务时需要将任务绑定到事件循环上,不然这个任务无法创建。

上面的代码只是为了演示原理,实际使用中,一般会用更高层的封装接口asyncio.run。它会自动获取当前运行的事件循环(不存在则创建)、自动将协程对象包装成Task(也可以直接run一个Task)并运行。

在大多数异步框架中,框架运行之初就创建好了事件循环并驱动框架的主任务执行,不需要我们使用手动驱动任务,因此在实际开发中运行一个协程更常用的方式是——在一个协程内部,用await驱动另外一个协程。

await

await关键字的作用和其语义一样——等待。

await coro的意思是告诉Event Loop,我需要等待coroutine执行完毕才能接着往下进行。也就是说,await会把控制权交还给事件循环。事件循环拿到控制权之后,会暂停当前协程(记为协程A),等corotine执行完毕之后,再把协程A唤醒,继续执行协程A的代码。

await前面还可以赋值:res = await coro。赋值号之后的流程和前述一致,不一样的地方是“继续执行协程A的代码”执行的就是这个赋值,也就是会把coro内的代码运行结果赋值给res

如果coro内的任务需要进行IO,事件循环自然就可以趁这个空闲去执行协程B,从而实现协程切换。

看个例子:

 1import asyncio
 2import time
 3
 4BASE_TIME = time.time()
 5
 6
 7async def delay_calc(x: int, y: int, delay: int = 1) -> int:
 8    await asyncio.sleep(delay)
 9    return x + y
10
11
12# 演示一下await赋值
13async def wrapper(x: int, y: int, delay: int = 1) -> None:
14    print(f"adding {x} and {y}, time: {time.time() - BASE_TIME}")
15    res = await delay_calc(x, y, delay)
16    print(f"result of {x} + {y}: {res}, time: {time.time() - BASE_TIME}")
17
18
19if __name__ == "__main__":
20    loop = asyncio.new_event_loop()
21    loop.run_until_complete(
22        asyncio.gather(
23            loop.create_task(wrapper(1, 2)),
24            loop.create_task(wrapper(3, 4)),
25        )
26    )
27    loop.close()

运行结果:

1adding 1 and 2, time: 0.0007719993591308594
2adding 3 and 4, time: 0.0009663105010986328
3result of 1 + 2: 3, time: 1.0045411586761475
4result of 3 + 4: 7, time: 1.0047202110290527

结果和前面流程基本一致:协程A在res = await delay_calc(x, y, delay)让出执行权,事件循环转而是驱动协程B执行,由于让出执行权和协程切换非常迅速,因此两个协程几乎同时输出了 adding … ,也基本同时结束。在这1s内,两个协程实现了并发执行。

 注意

这段代码是有瑕疵的,print()函数是阻塞式IO,会阻塞整个线程,但不影响我们理解原理。

当一个协程开始运行之后,CPU执行权在协程这里,事件循环是无法获得CPU的,只有协程主动await或运行结束时,事件循环才能获得CPU。因此事件循环无法主动暂停或终止协程。

sleep时不能用time.sleep(),因为这个是同步函数,会阻塞整个线程,达不到并发执行的效果。

 提示

在描述await的原理时,我刻意忽略了一个很关键的问题:被await的协程是如何被执行的?会创建一个新的Task注册到事件循环中吗?

答案是不会创建新的Task。在协程中await另一个协程,会直接执行新协程的代码,直到遇到实在无法立即执行的地方,才会真正把控制权交给事件循环。

站在使用者的角度,可以不用关心控制权是何时让出的,只需要知道await最终确实会交出控制权,等待被await的东西执行完后之后再回来执行当前协程。

除了协程对象之外,await还可以等待Task和Future等可等待对象(Awaitable)。

Future对象

Future对象是一个占位符,它表达的意思是:未来会有一个“人”(通常是协程)把执行结果放到这里。await一个Future的意思就是,我在这等着这个Future拿到它应得的结果。当然也可以用res = await future把这个结果赋值给res

asyncio.gather返回的就是一个Future对象。

Future对象是整个asyncio实现上的核心,Event Loop调度Task、await真正让出控制权,都离不开Future,甚至Task本身就是Future的子类。但是在应用层面,我们无需太过关心,只需要理解Future是一个占位符即可。

如果需要更深入asyncio的原理,或者要写一些asyncio没有提供的底层异步代码,可以参考asyncio的源码(纯Python写的,但代码有点ugly,没有类型注释,很多方法IDE跳转不过去)。

应用举例

并发请求

我们可以搭配异步HTTP库并发发送HTTP请求。

 1import asyncio
 2import time
 3
 4import httpx
 5
 6ASYNC_HTTPX_CLIENT = httpx.AsyncClient()
 7
 8
 9async def _get_index_length_single(url: str) -> int:
10    try:
11        response = await ASYNC_HTTPX_CLIENT.get(url)
12        return len(response.text)
13    except Exception:
14        return -1
15
16
17async def get_index_length(urls: list[str]) -> list[int]:
18    time_start = time.time()
19    results = await asyncio.gather(*(_get_index_length_single(url) for url in urls))
20    time_end = time.time()
21    print(f"URLs: {len(urls)}, Time: {time_end - time_start}")
22    return results
23
24
25async def main():
26    urls = [
27        "https://www.bing.com",
28        "https://www.baidu.com",
29        "https://www.sogou.com",
30        "https://www.zhihu.com",
31        "https://www.bilibili.com",
32    ]
33    print(await get_index_length(urls[:1]))
34    print(await get_index_length(urls[1:]))
35
36
37if __name__ == "__main__":
38    asyncio.run(main())

实际运行这段代码会发现,1个URL和4个URL耗时相差无几,但是如果使用同步代码依次请求4个URL,耗时是1个URL的4倍。

并发请求除了可以在业务代码中使用,也可以用于大模型吞吐量测试、简单压力测试等需要并发请求的脚本。

 提示

如果asyncio.gather()内的协程或任务执行时可能出现异常,可以将return_exceptions=True参数传入asyncio.gather(),这样Future结果中对应位置的结果是一个Exception,可以在遍历结果时进行处理。如果没有设置return_exceptions=True,gather会在遇到第一个Task出现异常时,捕获并抛出这个异常,并且其它的Task不会被取消。

在使用asyncio.gather()时,要么保证传入的协程内妥善处理了各种异常,要么设置return_exceptions=True,不然就可能产生一些孤儿Task。

SSE与异步生成器

在构建LLM应用时,经常需要以SSE的形式返回内容。这样就形成了一个生产者-消费者模型,异步生成器A通过流式请求LLM生产内容,异步生成器B消费这些内容(包装之后发送给客户端)。下面是一个简单的模拟:

 1import asyncio
 2from random import randint
 3from typing import AsyncGenerator
 4
 5
 6async def content_generator() -> AsyncGenerator[str, None]:
 7    text = "你好,我是一个大模型应用。"
 8    i, j = 0, len(text)
 9    while i < j:
10        delta = randint(1, 5)
11        yield text[i : i + delta]
12        i += delta
13        await asyncio.sleep(0.1)
14
15
16async def sse_generator() -> AsyncGenerator[str, None]:
17    async for content in content_generator():
18        yield f'data: {{"content":"{content}"}}\n\n'
19
20
21async def client():
22    async for line in sse_generator():
23        print(line)
24
25
26if __name__ == "__main__":
27    asyncio.run(client())

这个例子主要想说明使用async for in遍历异步生成器这一用法。

任务查看

假设你正在并发执行若干个任务,每个任务的耗时都不确定,你想隔段时间查看这些任务的完成情况,以便监控或终止耗时过长的任务。

 1import asyncio
 2import random
 3import time
 4
 5
 6async def random_task() -> float:
 7    sleep_time = random.random() * 5
 8    await asyncio.sleep(sleep_time)
 9    return sleep_time
10
11
12async def main():
13    pending_set = [asyncio.create_task(random_task()) for _ in range(10)]
14    done_set = None
15
16    results = []
17
18    start_time = time.time()
19
20    while pending_set:
21        done_set, pending_set = await asyncio.wait(pending_set, timeout=1.0)
22        cost_time = time.time() - start_time
23
24        for task in done_set:
25            results.append(task.result())
26
27        print(
28            "cost time: {:.2f}, pending: {}, done: {}".format(
29                cost_time, len(pending_set), len(results)
30            )
31        )
32
33        if cost_time > 3:
34            print("超时,未完成任务将被取消")
35            for task in pending_set:
36                task.cancel()
37            break
38
39    print(results)
40
41
42if __name__ == "__main__":
43    asyncio.run(main())

 提示

这里用的是asyncio.create_task(),这个方法会自动获取当前正在运行的Event Loop,然后用它创建任务(如果没有正在运行的Event Loop,这个方法会报错)。因为创建任务是在main这个协程内部,所以实际的创建动作必然在main协程运行时时,此时必然有一个正在运行的Event Loop。

当前正在运行的Event Loop也可以用asyncio.get_running_loop()来获取。

 提示

任务本身出现异常时,asyncio.wait()不会抛出这个异常,而是将调用task.set_exception()将任务标记为出现了异常,此时直接使用task.result()获取结果,会抛出任务的原始异常。在不确定任务会不会遇到异常时,在取结果之前可以用task.exception()进行判断。当然也可以用try ... except包括task.result()

to_thread

如果要执行的任务是同步IO,且没有异步替代方案,可以使用asyncio.to_thread()将任务委托给另外一个线程执行:

 1import asyncio
 2import time
 3
 4BASE_TIME = time.time()
 5
 6
 7async def delay_calc(x: int, y: int, delay: int = 1) -> None:
 8    print(f"adding {x} and {y}, time: {time.time() - BASE_TIME}")
 9    await asyncio.to_thread(time.sleep, delay)
10    print(f"result of {x} + {y}: {x + y}, time: {time.time() - BASE_TIME}")
11
12
13if __name__ == "__main__":
14    loop = asyncio.new_event_loop()
15    loop.run_until_complete(
16        asyncio.gather(
17            loop.create_task(delay_calc(1, 2)),
18            loop.create_task(delay_calc(3, 4)),
19        )
20    )
21    loop.close()

 注意

如果开启了GIL,不要把计算密集型任务委托给其它线程执行,因为它会和主线程争用GIL。此时应该使用多进程。

 提示

asyncio.to_thread()中函数执行出现异常,只能在asyncio.to_thread()外部用try ... except捕获。

run_in_executor

loop.run_in_executor(executor, func, *args)是将func放在executor中执行,executor可以是concurrent.futures.ThreadPoolExecutor(不推荐,3.10之后可以使用to_thread()替代),或者concurrent.futures.ProcessPoolExecutor(用于计算密集型任务)。

下面是一个用进程池异步执行计算密集型任务的例子:

 1import asyncio
 2import math
 3import random
 4import time
 5from concurrent.futures import ProcessPoolExecutor
 6
 7
 8def cpu_bound_calc(task_name: str) -> float:
 9    start_time = time.time()
10    length = 10**7
11    res = sum(math.sqrt(random.random()) for _ in range(length)) / length
12    print(f"{task_name} cost time: {time.time() - start_time}")
13    return res
14
15
16async def main():
17    loop = asyncio.get_running_loop()
18    start_time = time.time()
19    with ProcessPoolExecutor(max_workers=2) as executor:
20        results = await asyncio.gather(
21            loop.run_in_executor(executor, cpu_bound_calc, "task1"),
22            loop.run_in_executor(executor, cpu_bound_calc, "task2"),
23        )
24    print(f"cost time: {time.time() - start_time}")
25    print(results)
26
27
28if __name__ == "__main__":
29    asyncio.run(main())

实际运行结果(CPU性能不同可能结果不同):

1task1 cost time: 1.3992047309875488
2task2 cost time: 1.3943092823028564
3cost time: 1.588423728942871
4[0.6665390533899224, 0.6666580988367764]

一个彩蛋:

你可以尝试运行一下下面的代码,然后分析为什么带GIL的多线程下,总体耗时和单个任务的耗时接近。

 1import asyncio
 2import math
 3import random
 4import time
 5
 6
 7def cpu_bound_calc(task_name: str) -> float:
 8    start_time = time.time()
 9    length = 10**7
10    res = sum(math.sqrt(random.random()) for _ in range(length)) / length
11    print(f"{task_name} cost time: {time.time() - start_time}")
12    return res
13
14
15async def main():
16    start_time = time.time()
17    results = await asyncio.gather(
18        asyncio.to_thread(cpu_bound_calc, "task1"),
19        asyncio.to_thread(cpu_bound_calc, "task2"),
20    )
21    print(f"cost time: {time.time() - start_time}")
22    print(results)
23
24
25if __name__ == "__main__":
26    asyncio.run(main())

 提示

loop.run_in_executor()中函数执行出现异常,只能在loop.run_in_executor()外部用try ... except捕获。

TaskGroup

在 Python 3.11 中,官方引入了 asyncio.TaskGroup,这是一种更现代、更符合“结构化并发”理念的工具。相比于 asyncio.gather,它能更优雅地处理任务组的生命周期,尤其是在异常发生时。

TaskGroup 中的某个任务抛出异常时,它会自动取消组内其它尚未完成的任务。这避免了 gather 中可能出现的“孤儿 Task”跑飞的问题。

示例:

 1import asyncio
 2import time
 3
 4import httpx
 5
 6ASYNC_HTTPX_CLIENT = httpx.AsyncClient()
 7
 8
 9async def _get_index_length_single(url: str) -> int:
10    return len((await ASYNC_HTTPX_CLIENT.get(url)).text)
11
12
13async def get_index_length(urls: list[str]) -> list[int]:
14    time_start = time.time()
15
16    tasks = []
17    results = []
18
19    try:
20        async with asyncio.TaskGroup() as tg:
21            for url in urls:
22                tasks.append(tg.create_task(_get_index_length_single(url)))
23        results = [task.result() for task in tasks]
24    except ExceptionGroup as eg:
25        print(f"捕获到异常组:{eg}")
26        for e in eg.exceptions:
27            print(f"具体异常:{e}")
28
29    time_end = time.time()
30    print(f"URLs: {len(urls)}, Time: {time_end - time_start}")
31
32    return results
33
34
35async def main():
36    urls = [
37        "https://www.bing.com",
38        "https://www.baidu.com",
39        "https://www.sogou.com",
40        "https://www.zhihu.com",
41        "https://www.bilibili.com",
42    ]
43    print(await get_index_length(urls[:1] + ["https://bad.url"] * 2))
44    print(await get_index_length(urls[1:]))
45
46
47if __name__ == "__main__":
48    asyncio.run(main())

运行结果示例:

1捕获到异常组:unhandled errors in a TaskGroup (2 sub-exceptions)
2具体异常:[Errno 11001] getaddrinfo failed
3具体异常:[Errno 11001] getaddrinfo failed
4URLs: 3, Time: 0.01521611213684082
5[]
6URLs: 4, Time: 0.13132429122924805
7[28918, 19011, 118, 3286]

关于彩蛋

在run_in_executor的彩蛋中,之所以“两个运行在线程中的CPU密集型任务,单个任务的耗时和两个任务的总耗时接近”,不是因为我们关于GIL的理解出现了问题,而是单个任务耗时过久,在任务完成之前,Python就强行让任务让出了GIL,所以在宏观上,是两个任务交替执行的,每个任务的完成时间都被拉长到原来的两倍多。如果把10**7换成10**3,让单个任务在被迫让出GIL之前就执行完成,就可以看到两个任务的总耗时是单个任务耗时的两倍。