python并发与并行(二) ———— 用线程执行阻塞式IO,但不要用它做并行计算

1 minute read

Published:

用线程执行阻塞式IO,但不要用它做并行计算

Python语言的标准实现叫作CPython,它分两步来运行Python程序。首先解析源代码文本,并将其编译成字节码(bytecode)。字节码是一种底层代码,可以把程序表示成8位的指令(从Python 3.6开始,这种底层代码实际上已经变成16位了,所以应该叫作wordcode才对,但基本原理依然相同)。然后,CPython采用基于栈的解释器来运行字节码。这种字节码解释器在执行Python程序的过程中,必须确保相关的状态不受干扰,所以CPython会用一种叫作全局解释器锁(global interpreter lock,GIL)的机制来保证这一点。

GIL实际上就是一种互斥锁(mutual-exclusion lock,mutex),用来防止CPython的状态在抢占式的多线程环境(preemptive multithreading)之中受到干扰,因为在这种环境下,一条线程有可能突然打断另一条线程抢占程序的控制权。如果这种抢占行为来得不是时候,那么解释器的状态(例如为垃圾回收工作而设立的引用计数等)就会遭到破坏。所以,CPython要通过GIL阻止这样的动作,以确保它自身以及它的那些C扩展模块能够正确地执行每一条字节码指令。 但是,GIL会产生一个很不好的影响。在C++与Java这样的语言里面,如果程序之中有多个线程能够分头执行任务,那么就可以把CPU的各个核心充分地利用起来。尽管Python也支持多线程,但这些线程受GIL约束,所以每次或许只能有一条线程向前推进,而无法实现多头并进。所以,想通过多线程做并行计算或是给程序提速的开发者,恐怕要失望了。

我们用一段计算量很大的任务来看一下python在用多线程执行计算密集型任务时的表现。


# 因数分解算法
def factorize(number):
    for i in range(1,number+1):
        if number %i==0:
            yield i

import time

numbers = [2139079, 1214759, 1516637, 1852285]
start = time.time()

for number in numbers:
    list(factorize(number))

end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')

from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number

    def run(self):
        self.factors = list(factorize(self.number))

start = time.time()

threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)

# thread.join()方法的作用是等待线程完成。当你启动一个线程后,这个线程会异步执行。如果你希望主线程(通常是执行thread.start()的线程)等待这个新线程完成其任务后再继续执行,你就需要调用thread.join()。
# 如果不调用thread.join(),主线程可能会在其他线程完成之前继续执行,这可能导致一些不可预测的行为或资源访问冲突,特别是当多个线程需要访问共享资源时。通过调用join(),你确保了主线程会等待每个工作线程完成其执行,从而实现线程间的同步。
for thread in threads:
    thread.join()

end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')

Output:

Took 0.179 seconds
Took 0.158 seconds

我们看结果,多线程并没有比单线程快很多。

下面我们看个IO密集型的任务。


import select
import socket
import time
from threading import Thread

#select.select,这是一个系统调用,用于监视文件描述符集合的变化情况。具体来说,select 函数可以监视三种类型的文件描述符集合:
#可读集合(readfds):等待数据变得可读(例如,网络套接字上有数据可读)的文件描述符集合。
#可写集合(writefds):等待数据变得可写(例如,套接字缓冲区有足够的空间可以发送数据)的文件描述符集合。
#异常集合(exceptfds):等待异常情况(如带外数据到达)的文件描述符集合。
#select.select 函数的最后一个参数是一个超时值,表示 select 函数等待事件发生的最长时间。在这个例子中,超时值被设置为 0.1 秒,这意味着 select 会在 0.1 秒后超时,无论是否有事件发生。
def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)

start = time.time()

for _ in range(5):
    slow_systemcall()

end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')

start = time.time()

threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

# 用此函数来模拟在执行系统调用时,我们还同时在做其他的事情
def compute_helicopter_location(index):
    print('Running compute helicopter location ')

for i in range(5):
    compute_helicopter_location(i)

for thread in threads:
    thread.join()

end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')

Output:

Took 0.517 seconds
Running compute helicopter location 
Running compute helicopter location 
Running compute helicopter location 
Running compute helicopter location 
Running compute helicopter location 
Took 0.108 seconds

与依次执行系统调用的那种写法相比,这种写法的速度几乎能达到原来的5倍。这说明,尽管那5条线程依然受GIL制约,但它们所发起的系统调用是可以各自向前执行的。GIL只不过是让Python内部的代码无法平行推进而已,至于系统调用,则不会受到影响,因为Python线程在即将执行系统调用时,会释放GIL,待完成调用之后,才会重新获取它。

与依次执行系统调用的那种写法相比,这种写法的速度几乎能达到原来的5倍。这说明,尽管那5条线程依然受GIL制约,但它们所发起的系统调用是可以各自向前执行的。GIL只不过是让Python内部的代码无法平行推进而已,至于系统调用,则不会受到影响,因为Python线程在即将执行系统调用时,会释放GIL,待完成调用之后,才会重新获取它。