python并发与并行(三) ———— 利用Lock防止多个线程争用同一份数据
Published:
利用Lock防止多个线程争用同一份数据
了解到全局解释器锁(GIL)的效果之后,许多Python新手可能觉得没必要继续在代码里使用互斥锁(mutual-exclusion lock,mutex)了。既然GIL让Python线程没办法平行地运行在多个CPU核心上,那是不是就意味着它同时还会自动保护程序里面的数据结构,让我们不需要再加锁了?在列表与字典等结构上面测试过之后,有些人可能真的以为是这样的。
其实并非如此。GIL起不到这样的保护作用。虽说同一时刻只能有一条Python线程在运行,但这条线程所操纵的数据结构还是有可能遭到破坏,因为它在执行完当前这条字节码指令之后,可能会被Python系统切换走,等它稍后切换回来继续执行下一条字节码指令时,当前的数据或许已经与实际值脱节了,因为中途切换进来的其他线程可能更新过这个值。所以,多个线程同时访问同一个对象是很危险的。每条线程在操作这份数据时,都有可能遭到其他线程打扰,因此数据之中的固定关系或许已经被别的线程破坏了,这会令程序陷入混乱状态。
我们用一个程序来模拟传感器采集数据,然后使用多线程来统计最终传感器采集到的值。
这个Python代码示例主要演示了多线程环境下的并发问题,并使用了threading.Barrier来确保所有线程在同一时间点开始执行。代码的核心目的是展示在没有适当同步机制的情况下,多个线程同时更新共享资源(在这个例子中是一个简单的计数器)可能会导致数据不一致。
from threading import Barrier
BARRIER = Barrier(5)
from threading import Thread
class Counter:
def __init__(self):
self.count = 0
def increment(self, offset):
self.count += offset
def worker(sensor_index, how_many, counter):
# Barrier类,它允许一组线程在某个点上互相等待,直到所有线程都到达这个点(即达到指定的线程数量),然后它们才可以继续执行。
# BARRIER.wait()确保所有线程在开始计数之前都已经准备好,这样更容易触发并发问题(因为线程几乎同时开始更新共享资源)。
BARRIER.wait()
for _ in range(how_many):
counter.increment(1)
how_many = 10**5
counter = Counter()
threads = []
for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
Output:
Counter should be 500000, got 370684
由于这个示例没有使用任何锁或同步机制来保护对Counter实例的并发访问,因此你很可能会发现实际计数值与期望值不符。这就是所谓的“竞态条件”(race condition),它发生在多个线程同时访问和修改共享数据时。在实际应用中,为了避免这种情况,通常会使用锁(如互斥锁、读写锁等)或其他同步机制(如条件变量、信号量等)来确保数据的一致性和正确性。
Python解释器会保证多个线程可以公平地获得执行机会,或者说,保证每条线程所分配到的执行时间大致相等。为了实现这种效果,它会及时暂停某条线程,并且把另一条线程切换过来执行。然而问题是,我们并不清楚它具体会在什么时候暂停线程,万一这条线程正在执行的是一项本来不应该中断的原子操作(atomic operation),就会造成我们例子中这种错误的结果。
Counter对象的increment方法看上去很简单,工作线程在调用这个方法时,相当于是在执行下面这样一条语句:
counter.count += 1
然而,在对象的属性上面执行+=操作,实际上需要分成三个小的步骤。也就是说,Python解释器会把这一条语句分成三个语句来之行:
value = getattr(counter, 'count')
result = value + 1
setattr(counter, 'count', result)
这三个步骤本来应该一次执行完才对,但是Python系统有可能在任意两步之间,把当前这条线程切换走,这就导致这条线程在切换回来后,看到的是个已经过时的value值,它把这个过时的值通过setattr赋给Counter对象的count属性,从而使统计出来的样本总数偏小。
下面我们模拟一下多线程切换时的状态:
# Running in Thread A
value_a = getattr(counter, 'count')
# Context switch to Thread B
value_b = getattr(counter, 'count')
result_b = value_b + 1
setattr(counter, 'count', result_b)
# Context switch back to Thread A
result_a = value_a + 1
setattr(counter, 'count', result_a)
线程A在执行了第一步之后,还没来得及执行第二步,就被线程B打断了。等到线程B把它的三个步骤执行完毕后,线程A才重新获得执行机会。这时,它并不知道count已经被线程B更新过了,它仍然以为自己在第一步里读取到的那个value_a是正确的,于是线程A就给value_a加1并将结果(也就是result_a)赋给count属性。这实际上把线程B刚刚执行的那一次递增操作覆盖掉了。上面的传感器采样总数之所以出错,也正是这个原因所致。
为了避免数据争用,Python在内置的threading模块里提供了一套健壮的工具。其中最简单也最有用的是一个叫作Lock的类,它相当于互斥锁(mutex)。
通过这样的锁,我们可以确保多条线程有秩序地访问Counter类的count属性,使得该属性不会遭到破坏,因为线程必须先获取到这把锁,然后才能操纵count,而每次最多只能有一条线程获得该锁。下面,用with语句来实现加锁与解锁.
在这个修改后的代码中,我们引入了一个带有锁机制的计数器LockingCounter,以确保在多线程环境下对计数器进行安全地更新。下面是对修改后代码的详细解释:
from threading import Lock
from threading import Barrier
from threading import Thread
def worker(sensor_index, how_many, counter):
# Barrier类,它允许一组线程在某个点上互相等待,直到所有线程都到达这个点(即达到指定的线程数量),然后它们才可以继续执行。
# BARRIER.wait()确保所有线程在开始计数之前都已经准备好,这样更容易触发并发问题(因为线程几乎同时开始更新共享资源)。
BARRIER.wait()
for _ in range(how_many):
# increment(1)来增加计数器的值。由于LockingCounter使用了锁,因此每次只有一个线程能够修改计数器,从而防止了并发问题。
counter.increment(1)
how_many = 10**5
class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0
# increment 方法现在使用 with self.lock: 语句,这是一个上下文管理器,它确保了当线程尝试增加计数器时,会先获取锁。这防止了多个线程同时修改计数器,从而避免了数据竞争和不一致。
def increment(self, offset):
with self.lock:
self.count += offset
BARRIER = Barrier(5)
counter = LockingCounter()
for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
Output:
Counter should be 500000, got 500000
这个修改后的代码通过引入锁机制来确保在多线程环境下计数器的正确更新,从而避免了并发问题。
这个修改后的代码通过引入锁机制来确保在多线程环境下计数器的正确更新,从而避免了并发问题。