主题
异步编程
asyncio是Python 3.5引入的异步编程标准库,它基于事件循环,在单线程内实现并发。asyncio使用async/await语法,让异步代码看起来像同步代码,大大降低了异步编程的复杂度。
这一章介绍asyncio的核心概念和用法,包括协程、任务、事件循环等内容。
async与await
asyncio的基石是协程函数。用async def定义的函数是协程函数,调用它返回一个协程对象:
python
async def fetch_data():
return "data"
coro = fetch_data()
print(coro) # <coroutine object fetch_data at 0x...>协程函数和普通函数不同:调用协程函数只是创建了协程对象,不会真正执行。要执行协程,必须用asyncio.run()或者在已有事件循环中用await:
python
import asyncio
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())asyncio.run()创建新的事件循环,运行协程,然后关闭循环。这是最常用的启动协程的方式。
await关键字有两个作用:暂停当前协程,等待另一个协程完成。暂停期间,事件循环可以运行其他协程。
python
async def task1():
print("Task 1 start")
await asyncio.sleep(1)
print("Task 1 end")
async def task2():
print("Task 2 start")
await asyncio.sleep(0.5)
print("Task 2 end")
async def main():
await task1()
await task2()
asyncio.run(main())这个例子中,task1和task2是顺序执行的。如果让它们并发执行:
python
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())用asyncio.gather()可以并发运行多个协程。总耗时是1秒(最长任务的耗时),而不是1.5秒。
事件循环
事件循环是asyncio的核心。它是一个无限循环,监听IO事件,在等待期间执行可运行的任务。
asyncio.run()自动创建和管理事件循环。在高级用法中,可能需要手动控制事件循环:
python
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()这在需要在同一事件循环中运行多个任务的场景下很有用。但大多数情况下,asyncio.run()就够了。
事件循环可以注册回调函数:
python
def callback(name):
print(f"Callback {name}")
loop = asyncio.new_event_loop()
loop.call_soon(callback, "first")
loop.call_soon(callback, "second")
loop.run_until_complete(asyncio.sleep(0))call_soon()在下一个事件循环迭代时执行回调。call_later()延迟一段时间后执行。
Task与Future
Task是Future的子类,代表一个可等待的对象,可以在事件循环中并发执行。
asyncio.create_task()创建Task并安排其执行:
python
async def main():
task = asyncio.create_task(fetch_data())
print("Task created")
result = await task
print(f"Result: {result}")
asyncio.run(main())task在创建后立即开始执行,不需要等待。创建task和等待task之间,协程可能在后台运行。
asyncio.ensure_future()也可以创建task,是旧API但仍广泛使用:
python
task = asyncio.ensure_future(fetch_data())Future代表一个异步操作的最终结果。可以手动创建Future并在其完成后设置结果:
python
async def with_future():
future = asyncio.Future()
def set_result():
future.set_result("Done")
loop.call_soon(set_result)
result = await future
print(result)通常不需要直接操作Future,asyncio的API会处理它。
并发执行
asyncio.gather()是最常用的并发执行工具:
python
async def fetch(url):
await asyncio.sleep(0.5)
return url
async def main():
urls = ['a', 'b', 'c']
results = await asyncio.gather(*[fetch(url) for url in urls])
print(results)
asyncio.run(main())asyncio.gather()按顺序返回结果列表,总耗时是所有任务中最长任务的耗时。
asyncio.wait()等待多个task完成,返回(done, pending)元组:
python
async def main():
task1 = asyncio.create_task(task1())
task2 = asyncio.create_task(task2())
done, pending = await asyncio.wait([task1, task2])
for t in done:
print(t.result())asyncio.as_completed()按完成顺序返回结果:
python
async def main():
tasks = [asyncio.create_task(fetch(url)) for url in urls]
for future in asyncio.as_completed(tasks):
result = await future
print(result)异步生成器与迭代器
异步生成器用async def定义,使用yield产出值:
python
async def async_gen():
for i in range(3):
await asyncio.sleep(0.1)
yield i
async def main():
async for value in async_gen():
print(value)
asyncio.run(main())异步迭代器需要实现__aiter__和__anext__方法:
python
class AsyncCounter:
def __init__(self, n):
self.n = n
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.n:
raise StopAsyncIteration
value = self.current
self.current += 1
await asyncio.sleep(0.1)
return value
async def main():
async for i in AsyncCounter(3):
print(i)
asyncio.run(main())async for自动调用__anext__,并在遇到StopAsyncIteration时停止。
异步上下文管理器
异步上下文管理器用async with语句:
python
async def main():
async with async_resource() as resource:
await resource.use()异步上下文管理器需要实现__aenter__和__aexit__方法:
python
class async_resource:
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.release()
return Falseaiohttp库的ClientSession就是异步上下文管理器的典型应用:
python
import aiohttp
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()async with确保资源在使用后被正确清理,即使在异步操作中发生异常也会执行清理逻辑。
异常处理
协程中的异常需要特别处理,否则可能被静默忽略:
python
async def risky():
raise ValueError("Oops!")
async def main():
try:
await risky()
except ValueError as e:
print(f"Caught: {e}")
asyncio.run(main())用try/except包装await。
用asyncio.gather()时,可以收集所有结果或异常:
python
async def main():
results = await asyncio.gather(
safe_task(),
risky_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} raised: {result}")
else:
print(f"Task {i} result: {result}")return_exceptions=True让gather不抛出异常,而是将异常作为结果返回。
取消任务
可以取消正在运行的任务:
python
async def long_task():
try:
while True:
print("Working...")
await asyncio.sleep(0.5)
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(1.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(main())task.cancel()请求取消任务。任务需要在await点检查取消状态,或者捕获CancelledError并重新抛出。
可以用asyncio.shield()保护任务不被取消:
python
async def main():
protected = asyncio.create_task(shield(some_task()))
await asyncio.sleep(1)
protected.cancel()
try:
await protected
except asyncio.CancelledError:
print("Protected task was cancelled despite shield")shield()确保被保护的任务不会被外部取消请求取消。
常见误区
第一个误区是忘记启动协程。协程如果不用await调用或者不用asyncio.run()运行,什么都不会发生:
python
async def hello():
print("Hello!")
hello() # 什么都不打印
asyncio.run(hello()) # 真正运行第二个误区是在同步代码中使用await。await只能在协程函数中使用。如果需要在同步代码中运行协程,必须用asyncio.run()。
第三个误区是混淆并发和并行。asyncio是单线程并发,不是多线程并行。多个协程在同一个线程中交替执行。如果需要真正的并行计算,应该用multiprocessing。
第四个误区是在异步函数中使用阻塞调用。time.sleep()会阻塞整个事件循环,应该用await asyncio.sleep():
python
async def wrong():
time.sleep(1) # 阻塞事件循环
async def right():
await asyncio.sleep(1) # 让出控制权如果必须使用阻塞调用,可以用loop.run_in_executor()在线程池中执行:
python
async def with_blocking():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_function, arg)异步与线程的结合
asyncio可以与线程结合使用,处理不支持异步的库:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
time.sleep(1)
return "Done"
async def main():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
result = await loop.run_in_executor(executor, blocking_io)
print(result)
executor.shutdown()run_in_executor()在线程池中执行阻塞函数,不阻塞事件循环。
也可以从协程中启动线程:
python
import asyncio
import threading
def thread_worker():
time.sleep(1)
print("Thread done")
async def main():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, thread_worker)
asyncio.run(main())这种方式适合逐步将同步代码迁移到异步。