python并发与并行(七) ———— 如果必须用线程做并发,那就考虑通过ThreadPoolExecutor实现

3 minute read

Published:

Python有个内置模块叫作concurrent.futures,它提供了ThreadPoolExecutor类。这个类结合了线程(Thread)方案与队列(Queue)方案的优势,可以用来平行地处理康威生命游戏里的那种I/O操作(参见前面讲的线程方案和队列方案)。

我们把之前的代码拿过来进行更改。

# Example 1
ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

from threading import Lock

class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)

def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

# def game_logic(state, neighbors):
#     # Do some blocking input/output in here:
#     data = my_socket.recv(100)

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

这次在把游戏推进到下一代的时候,我们不针对每个单元格启动新的Thread实例,而是把推进每个单元格状态的那个函数与必要的参数提交给ThreadPoolExecutor,让执行器自己安排线程去执行这些状态更新任务,这样就实现了fan-out(分派)。稍后,可以等待提交过去的所有任务都执行完毕,然后再把整张网格正式推进到下一代,这样就实现了fan-in(归集)。

from concurrent.futures import ThreadPoolExecutor

def simulate_pool(pool, grid):
    next_grid = LockingGrid(grid.height, grid.width)

    futures = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            future = pool.submit(step_cell, *args)  # Fan out
            futures.append(future)

    for future in futures:
        future.result()                             # Fan in

    return next_grid

用来推进游戏状态的这些线程可以提前分配,不用每次执行simulate_pool都分配一遍,这样能够降低启动线程的开销。另外,线程池里的最大线程数可以通过max_workers参数手工指定,这样能把线程数量限制在一定范围内,而不像最早的那个方案那样,每执行一项I/O操作,就启动一条线程,那样会导致内存用量激增。

class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
with ThreadPoolExecutor(max_workers=10) as pool:
    for i in range(5):
        columns.append(str(grid))
        grid = simulate_pool(pool, grid)

print(columns)

ThreadPoolExecutor类的最大优点在于:如果调用者通过submit方法把某项任务提交给它执行,那么会获得一个与该任务相对应的Future实例,当调用者在这个实例上通过result方法获取执行结果时,ThreadPoolExecutor会把它在执行任务的过程中所遇到的异常自动抛给调用者。

ThreadPoolExecutor方案仍然有个很大的缺点,就是I/O并行能力不高,即便把max_workers设成100,也无法高效地应对那种有一万多个单元格,且每个单元格都要同时做I/O的情况。如果你面对的需求,没办法用异步方案解决,而是必须执行完才能往后走(例如文件I/O),那么ThreadPoolExecutor是个不错的选择。

我们同样按照之前的修改,把Gird size改为(500,900),迭代次数100次,线程数设置为5,结果为:

 2%|█▍                                                                     | 2/100 [00:17<14:34,  8.92s/it]

比之前的结果都要离谱。

我们总结一下之前使用方法的性能:

# | 配置
性能(s) | 单线程 | 线程方案实现多线程 | 队列方案实现多线程 | ThreadPoolExecutor方案实现多线程 | |————————————-|——————–|——————-|——————–|—————————| | Grid(500,900)
step 100次
线程数 5 | 55.45792198181152 | | 170.44810271263123 | 6410.6107659339905 | | Grid(50,90)
step 100次
线程数 5 | 0.6782510280609131 | 14.29249095916748 | 2.5422348976135254 | 5.175195217132568 |

可以看到ThreadPoolExcutoThreadPoolThreadPoolExcutor的并发性能并不高。