提到线程必须了解两个术语:
• 并发Concurrency,系统具备处理多个任务的能力,它们的在执行在时间上重叠,但不一定同时发生。
• 并行Parallelism:多个任务利用多核CPU真正同时执行。
Python的线程是一个并发框架,线程并行运行的时候,每个线程执行代码的一部分,Python解释器在它们之间切换,将执行控制权交给每个线程。
理解线程并行
先简单举个例子:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def threaded_function():
for number in range(3):
print(f"Printing from {threading.current_thread().name}. {number=}")
time.sleep(0.1)
with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
for _ in range(4):
executor.submit(threaded_function)
先打印输出:
Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2
启动了四个线程,可以观察到在Worker_0打印number=0之后 ,并不会立刻打印number=1,原因就在于要切换给其他线程运行,而且各个线程的运行顺序是不一样的。
如何做到这个的呢?因为python解析器会进行上下文切换,默认的间隔时间如下列代码:
import sys
sys.getswitchinterval()
0.005
5毫秒的间隔并不意味着线程会精确地每5毫秒切换一次,而是意味着解释器会在这些间隔内考虑切换到另一个线程,而代码中的sleep()是为了增加了在此期间发生上下文切换的可能性。
什么是线程安全
由于上下文切换,程序在多线程环境中运行时可能会表现出意外行为,这就导致线程不安全问题,而如果代码在多线程环境中运行时表现出确定性并产生期望的输出,那么它就被认为是线程安全的。
线程安全问题通常源于两个原因:
• 共享可变数据:线程共享父进程的内存,因此所有变量和数据结构在各线程之间是共享的。对这些共享数据进行修改时可能会引发错误。
• 非原子操作:多线程环境中,涉及多个步骤的操作可能会被上下文切换中断,尤其是在执行过程中切换到其他线程时,容易导致意外结果。
1:GIL
在讨论Python线程时,Python 3.12之前的GIL(Global Interpreter Lock)不可避免要提到。GIL是一个互斥锁,目的是保护Python对象的访问,防止多个线程同时执行Python字节码。它阻止了真正的线程并行,尤其在CPU密集型任务中,多线程性能会受到严重限制。不过,这意味着对于I/O密集型任务,线程并行仍然适用。
由于GIL的存在,当某个操作能在单个字节码指令中完成时,它是原子的。那么,Python是否因此天然线程安全?并非如此。因为I/O操作仍然可以并行执行,因此即使有GIL,访问共享可变数据时依然需要锁等同步机制确保线程安全。
GIL是否完全消除了Python的多线程并发能力?并没有,Python支持通过多进程来实现真正的并行。
值得关注的是,从Python 3.13开始,Python提供了无GIL的解释器,实现了真正的线程并行。但无论是否有GIL,编写代码时始终建议合理地保护线程安全——也就是说,不依赖GIL,主动保证线程安全。
2:竞争
现在来看看第二个核心概念,竞争条件发生在程序的结果依赖于不可控事件的顺序或时间,如线程执行顺序时,如果没有适当的同步会导致程序出现不可预测的错误。
下面的例子就来模拟这种情况,两个线程同时修改一个属性:
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def withdraw(self, amount):
if self.balance >= amount:
new_balance = self.balance - amount
time.sleep(0.1)
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(account.withdraw, 500)
executor.submit(account.withdraw, 700)
print(f"Final account balance: {account.balance}")
先猜可能的结果,代码可能会输出:
Final account balance: 300
也可能会输出:
Final account balance: 500
为什么会出现这样的情况呢?这就是由于线程执行顺序不一致导致的,如果先扣700,而第二个线程突然切换过来了,检查余额够,最终就扣了500,余额就变成500了,当然结果是错误的。
同步原语
为了解决线程不安全问题,Python的threading模块提供了各种同步原语,以防止竞争条件并允许线程之间的协调。
同步原语会:
• 控制线程同时执行代码块
• 使多个代码语句对线程来说是原子的
• 限制线程的并发访问
• 在线程之间进行协调,并根据其他线程的状态执行操作
接下去使用Python线程锁和信号实现互斥。
使用Python线程锁实现互斥
锁是一种同步原语,可用于独占访问资源,一旦一个线程获取了锁,其他线程就不能再获取它并继续执行,直到锁被释放,可以使用锁来封装应该原子执行的语句或语句组。
python提供两个lock相关的函数:
• 当一个线程调用.acquire()方法时,如果Lock对象已经被另一个线程锁定,那么调用的线程会被阻塞,直到持有锁的线程释放锁。
• release() 会释放一个被线程获取的锁,如果尝试释放一个未锁定的锁,会引发RuntimeError。
如果使用with语句,Lock 对象可用作上下文管理器,可以自动获取和释放锁。
为了解决上面代码存在的问题,可以:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.account_lock = threading.Lock()
def withdraw(self, amount):
with self.account_lock:
if self.balance >= amount:
new_balance = self.balance - amount
print(f"Withdrawing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
def deposit(self, amount):
with self.account_lock:
new_balance = self.balance + amount
print(f"Depositing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(account.withdraw, 700)
executor.submit(account.deposit, 1000)
executor.submit(account.withdraw, 300)
print(f"Final account balance: {account.balance}")
上述代码通过锁成功保证了线性安全。
如果由于代码中的错误或疏忽导致锁未正确释放,可能会导致死锁,即线程无限期地等待锁被释放。
死锁的原因包括:
• 嵌套锁获取:如果一个线程尝试获取它已经持有的锁,可能会发生死锁,同一线程尝试多次获取相同的锁会导致线程阻塞自身,这种情况在没有外部干预的情况下无法解决。
• 多重锁获取:当使用多个锁时,如果线程以不一致的顺序获取这些锁,可能会发生死锁,如果两个线程各自持有一个锁并等待对方释放锁,那么两个线程都无法继续,从而导致死锁。
对于多重锁可以使用可重入锁RLock解决,当持有线程再次请求锁时,它不会阻塞,允许线程在释放锁之前多次获取锁,这在递归函数或线程需要重新进入已锁定资源的情况下非常有用,相对来说,RLock因为要跟踪同一线程获取锁的次数,会有性能开销。
Semaphores信号量
在资源数量有限且多个线程尝试访问这些有限资源时非常有用,它使用一个计数器来限制多个线程对临界区的访问,每次调用.acquire() 都会将信号量的计数器减少一个,当计数器达到零时,再.acquire() 调用将被阻塞。
举一个例子,多个客户在银行等待有限数量的柜员服务:
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)
def now():
return time.strftime("%H:%M:%S")
def serve_customer(name):
print(f"{now()}: {name} is waiting for a teller.")
with teller_semaphore:
print(f"{now()}: {name} is being served by a teller.")
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} is done being served.")
customers = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=3) as executor:
for customer_name in customers:
thread = executor.submit(serve_customer, customer_name)
print(f"{now()}: All customers have been served.")
代码很好理解,当某个线程达到计数器上限后,它会被阻塞,直到其他线程在with语句中因为完成服务而释放,但不管怎么样,每次只有三个客户被服务。
参考:https://realpython.com/python-thread-lock/#using-python-threading-locks-for-mutual-exclusion