主题
多线程编程
threading是Python中实现多线程编程的标准模块。与多进程不同,多线程在同一个进程内运行,共享进程的内存空间,这让线程间通信更便捷,但也带来了同步问题。
这一章介绍threading模块的基本用法,包括线程创建、同步原语、线程安全等内容。
线程与进程的区别
理解线程和进程的区别是正确选择并发方案的前提。
进程是资源分配的基本单位,有独立的地址空间、文件描述符、堆内存。进程之间相互隔离,一个进程崩溃不会影响其他进程。但进程创建和切换开销大,进程间通信需要额外机制。
线程是CPU调度的基本单位,运行在同一进程的地址空间内,共享进程的堆内存和文件描述符。线程创建和切换开销小,线程间可以直接读写共享数据。但正因为共享内存,一个线程的错误可能导致整个进程崩溃。
Python的GIL让多线程无法真正并行执行。对于CPU密集型任务,多线程甚至可能比单线程更慢。但对于IO密集型任务,多线程仍然有效——线程在等待IO时可以释放GIL,让其他线程继续执行。
选择多线程还是多进程,取决于任务特性和性能要求。IO密集型任务适合多线程,CPU密集型任务如果需要利用多核则用多进程。
创建线程
threading模块提供了多种创建线程的方式。
创建线程的最基本方式是通过Thread类:
python
import threading
def worker(name):
print(f"Hello from {name}")
t = threading.Thread(target=worker, args=("Worker",))
t.start()
t.join()target参数指定线程执行的函数,args是传递给函数的参数。start()启动线程,join()等待线程结束。
给线程命名方便调试:
python
t = threading.Thread(target=worker, name="MyWorker")
print(t.name) # MyWorker也可以继承Thread类,重写run()方法:
python
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print(f"Running {self.name}")
t = MyThread(name="CustomThread")
t.start()
t.join()这种方式适合需要在线程中封装更多逻辑的场景。
线程同步原语
多个线程访问共享资源时,需要同步机制防止数据不一致。
Lock
Lock是最基本的锁,同一时刻只允许一个线程持有:
python
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock:
counter += 1with lock:语句确保同一时刻只有一个线程执行缩进内的代码。由于counter += 1不是原子操作,必须用锁保护。
RLock
RLock允许同一线程多次获取:
python
lock = threading.RLock()
with lock:
with lock: # 同一线程可以再次获取
passRLock允许持有锁的线程重复获取锁,避免自己锁死自己。代价是比Lock稍复杂,性能也略低。
Semaphore
Semaphore控制同时访问资源的线程数:
python
semaphore = threading.Semaphore(2)
def worker(n):
with semaphore:
print(f"Worker {n} acquired semaphore")
for i in range(3):
threading.Thread(target=worker, args=(i,)).start()Semaphore内部维护一个计数器,获取时减1,释放时加1。计数器为0时,获取操作阻塞。适合限制并发连接数。
Event
Event用于线程间信号通知:
python
event = threading.Event()
def waiter():
print("Waiting for event")
event.wait()
print("Event triggered!")
def setter():
print("Setting event")
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()wait()阻塞等待事件,set()触发事件。Event可以重复使用,clear()重置事件。
Condition
Condition结合了锁和等待通知机制:
python
condition = threading.Condition()
def consumer():
with condition:
while not data_ready:
condition.wait()
print("Consuming:", data)
def producer():
global data_ready, data
data = "produced"
data_ready = True
with condition:
condition.notify()wait()阻塞等待,notify()唤醒一个等待的线程,notify_all()唤醒所有等待的线程。Condition适合生产者-消费者模式。
线程安全的数据结构
Python的queue模块提供了线程安全的队列:
python
import queue
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Got: {item}")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t1.join()
q.put(None) # 发送结束信号
t2.join()Queue的get()和put()操作是线程安全的,不需要额外加锁。Queue还提供了join()方法等待队列清空。
线程池
concurrent.futures模块提供了线程池接口:
python
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for f in futures:
print(f.result())ThreadPoolExecutor管理线程池,自动复用线程。submit()提交任务返回Future对象,result()获取结果。
也可以用map()简化:
python
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(task, range(5))
print(list(results))map()按顺序返回结果,如果需要按完成顺序处理,用as_completed():
python
from concurrent.futures import as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for f in as_completed(futures):
print(f.result())GIL与线程性能
GIL是Python面试中的高频问题,理解它对正确使用多线程至关重要。
GIL的存在意味着同一时刻只有一个线程执行Python字节码。即使创建多个线程,在任意时刻只有一个线程真正运行。这意味着多线程无法加速CPU密集型任务。
python
import threading
import time
def cpu_task():
total = 0
for i in range(10**7):
total += i
start = time.time()
t1 = threading.Thread(target=cpu_task)
t2 = threading.Thread(target=cpu_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Time: {time.time() - start:.2f}") # 可能比单线程更慢对于IO密集型任务,多线程仍然有效:
python
import requests
def fetch(url):
return requests.get(url).status_code
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch, url) for url in urls]当线程等待网络响应时,会释放GIL,让其他线程继续执行。因此多线程可以并发处理多个IO请求。
常见误区
第一个误区是以为多线程可以加速CPU密集型任务。由于GIL的存在,CPU密集型任务在多线程中反而可能更慢。如果需要加速CPU密集型任务,应该使用多进程。
第二个误区是在不需要的时候加锁。锁保护共享数据,但也会降低并发性。如果数据不被多线程共享,就不需要加锁。过度的锁反而降低性能。
第三个误区是忘记释放锁。使用with lock:自动释放,即使代码抛异常也会释放。避免手动lock.acquire()和lock.release()。
第四个误区是死锁。常见的死锁原因包括:持有多把锁时以不同顺序获取、忘记释放锁、线程相互等待。预防死锁的方法包括:固定加锁顺序、使用超时、使用RLock。
python
# 潜在死锁:两把锁以不同顺序获取
def task1():
with lock_a:
with lock_b:
pass
def task2():
with lock_b:
with lock_a:
pass线程与协程的选择
多线程和协程都可以实现并发,但适用场景不同。
多线程适合:需要真正并行执行(绕过GIL的场景,如IO等待期间)、需要利用多核CPU、任务相对独立、复杂计算配合C扩展释放GIL。
协程适合:高并发IO操作、需要极低并发开销、单线程内实现并发、避免锁和线程安全问题。
现代Python中,asyncio是处理高并发IO的首选方式。只有在asyncio无法满足需求时,才考虑多线程。