python并发与并行(五.1) ———— 不要在每次fan-out时都新建一批Thread实例

7 minute read

Published:

我们使用康威生命游戏的例子来解释这个专题。首先我们要实现一个康威生命游戏。

这是个经典的有限状态自动机。它的规则很简单: 在任意长宽的二维网格中,每个单元格都必须处于ALIVE或EMPTY状态, 前者表示这个单元格里有生命存在,后者表示这里没有生物(或者原有生物已经死亡)。

时钟每走一格,游戏就要前进一步。 这个时候,我们需要考虑每个单元格的周围总共有多少个处于存活状态的单元格, 并根据这个数量来决定本单元格的新状态: 如果当前有生命体存在(ALIVE),那么该生命体有可能继续存活,也有可能死亡; 如果单元格当前是空白的(EMPTY),那么下一步有可能继续保持空白,也有可能诞生新的生命体。

我们定义Grid类,一个简单的容器类管理这些单元格的状态。 Grid类必须提供get与set方法,以获取并设置任何一个坐标点 (或者说任何一个单元格)的值。如果坐标越界,那么应该自动绕回, 产生一种无限折返的效果。


ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

为了观察这个类的实际效果,我们创建Grid实例, 并采用经典的滑翔机(glider)形状来开局:

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)

Output:

---*-----
----*----
--***----
---------
---------

定义一个函数查询本单元格周边的八个单元格, 并统计其中有几个处于存活(ALIVE)状态。

给函数设计参数时,不应该让它明确接受Grid实例, 因为那样会导致这个函数与Grid类耦合。 只需要把一个能根据坐标来查询单元格状态的函数传给get参数即可.


def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

编写一段程序来做测试:


# 用来对count_neighbors做测试
alive = {(9, 5), (9, 6)}
seen = set()

def fake_get(y, x):
    position = (y, x)
    seen.add(position)
    return ALIVE if position in alive else EMPTY

count = count_neighbors(10, 5, fake_get)
assert count == 2


expected_seen = {
    (9, 5),  (9, 6),  (10, 6), (11, 6),
    (11, 5), (11, 4), (10, 4), (9, 4)
}
assert seen == expected_seen

现在来定义康威生命游戏的逻辑。

这套逻辑共有三条规则。

第一,如果单元格里有生命体,而且周边的存活单元格少于两个,那么本单元格里的生命体死亡;

第二,如果单元格里有生命体,而且周边的存活单元格多于三个,那么本单元格里的生命体死亡;

第三,如果单元格为空(或者说,单元格里面的生命体已经死亡),而且周边的存活单元格恰好是三个,那么本单元格里的生命体复活。

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

assert game_logic(ALIVE, 0) == EMPTY
assert game_logic(ALIVE, 1) == EMPTY
assert game_logic(ALIVE, 2) == ALIVE
assert game_logic(ALIVE, 3) == ALIVE
assert game_logic(ALIVE, 4) == EMPTY
assert game_logic(EMPTY, 0) == EMPTY
assert game_logic(EMPTY, 1) == EMPTY
assert game_logic(EMPTY, 2) == EMPTY
assert game_logic(EMPTY, 3) == ALIVE
assert game_logic(EMPTY, 4) == EMPTY

下面编写一个函数,用来更改单元格的状态。这个函数调用前面的count_neighbors与game_logic`,这个函数负责根据坐标查出单元格当前的状态,然后统计周边总共有多少个存活的单元格,接下来根据当前状态与存活的邻居数量判断本单元格在下一轮的状态,最后,更新单元格的状态。

在设计这个接口时,也不允许传入Grid实例,而是传入一个能根据坐标来设置新状态的函数,以降低耦合度。


def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)


alive = {(10, 5), (9, 5), (9, 6)}
new_state = None

def fake_get(y, x):
    return ALIVE if (y, x) in alive else EMPTY

def fake_set(y, x, state):
    global new_state
    new_state = state

# Stay alive
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE

# Stay dead
alive.remove((10, 5))
step_cell(10, 5, fake_get, fake_set)
assert new_state == EMPTY

# Regenerate
alive.add((10, 6))
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE

最后,我们定义一个函数,把整张网格之中的每一个单元格都向前推进一步,并返回一张新的网格,用来表示下一代的状态。

在实现这个函数时,要调用刚才写的step_cell函数,这时必须注意把get与set参数写对。get指的是当前这代网格(grid)之中的get方法,而set指的则是下一代网格(next_grid)的set方法,只有这样,才能让每一个单元格都按照现在的情况分别演化到下一轮,而不会让先演化的单元格影响其他单元格的迁移结果,这对于游戏正常运行是很重要的。

假如设计step_cell时,让它只接受一个Grid实例,而不是分别通过两个参数来接受获取与设置单元格状态所用的那两个方法,那么这里的simulate就不好写了,我们若是把当前的grid传过去,那么它里面的单元格状态就会被step_cell函数破坏掉。

def simulate(grid):
    next_grid = Grid(grid.height, grid.width)
    for y in range(grid.height):
        for x in range(grid.width):
            step_cell(y, x, grid.get, next_grid.set)
    return next_grid

现在,通过for循环来推进这张网格(或者说棋盘),推进到第四代时,大家就会发现,原来那个滑翔机的形状已经整体向右下方移动了一个位置。

当然这个效果,最终还是通过game_logic函数里面那三条简单的规则而得以落实的。

class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate(grid)

print(columns)

到此为止,我们游戏的代码编写就结束了。完成的程序如下:


ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output


grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)


def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

alive = {(9, 5), (9, 6)}
seen = set()

def fake_get(y, x):
    position = (y, x)
    seen.add(position)
    return ALIVE if position in alive else EMPTY

count = count_neighbors(10, 5, fake_get)
assert count == 2

expected_seen = {
    (9, 5),  (9, 6),  (10, 6), (11, 6),
    (11, 5), (11, 4), (10, 4), (9, 4)
}
assert seen == expected_seen


def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

assert game_logic(ALIVE, 0) == EMPTY
assert game_logic(ALIVE, 1) == EMPTY
assert game_logic(ALIVE, 2) == ALIVE
assert game_logic(ALIVE, 3) == ALIVE
assert game_logic(ALIVE, 4) == EMPTY
assert game_logic(EMPTY, 0) == EMPTY
assert game_logic(EMPTY, 1) == EMPTY
assert game_logic(EMPTY, 2) == EMPTY
assert game_logic(EMPTY, 3) == ALIVE
assert game_logic(EMPTY, 4) == EMPTY


def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

alive = {(10, 5), (9, 5), (9, 6)}
new_state = None

def fake_get(y, x):
    return ALIVE if (y, x) in alive else EMPTY

def fake_set(y, x, state):
    global new_state
    new_state = state

# Stay alive
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE

# Stay dead
alive.remove((10, 5))
step_cell(10, 5, fake_get, fake_set)
assert new_state == EMPTY

# Regenerate
alive.add((10, 6))
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE


def simulate(grid):
    next_grid = Grid(grid.height, grid.width)
    for y in range(grid.height):
        for x in range(grid.width):
            step_cell(y, x, grid.get, next_grid.set)
    return next_grid


class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate(grid)

print(columns)

Output:

---*-----
----*----
--***----
---------
---------

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

这个程序,在单机单线程的环境下,是没有问题的。但如果需求变了呢? 例如game_logic函数或许要执行某些I/O操作(例如要通过socket通信)。如果这是大型多人在线游戏(massively multiplayer online game,MMOG)的一部分,那么这些单元格可能分别对应全球各地的玩家,所以在迁移每个单元格的状态时,都要联网查询其他玩家的状态,这样可能必须要执行I/O操作。

这种需求应该如何实现呢?最简单的办法是,把执行阻塞式的I/O操作直接放在game_logic函数里面执行。

将单机的game_logic实现

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

替换为如下实现:

def game_logic(state, neighbors):
    # Do some blocking input/output in here:
    data = my_socket.recv(100)

这种写法的问题在于,它会拖慢整个程序的速度。如果game_logic函数每次执行的I/O操作需要100毫秒才能完成(与国外的玩家通信一个来回,确实有可能需要这么长时间),那么把整张网格向前推进一代最少需要4.5秒,因为simulate函数在推进网格时,是一个一个单元格来计算的,它需要把这45个单元格按顺序计算一遍。这对于网络游戏来说,实在太慢,让人没耐心玩下去。

另外,这个方案也无法扩展,假如单元格的数量增加到一万,那么计算新一代网格所花的总时间就会超过15分钟。

若想降低延迟时间,应该平行地执行这些I/O操作,这样的话,无论网格有多大,都只需要100毫秒左右就能推进到下一代。针对每个工作单元开辟一条执行路径,这种模式叫作扇出(fan-out),对于本例来说,工作单元指的是网格中的单元格。然后,要等待这些并发的工作单元全部完工,才能执行下一个环节,这种模式叫作扇入(fan-in),对于本例来说,下一个环节指的是让整张网格进入新的一代。

fan-out与fan-in是最常见的两种并发协调(concurrency coordination)模式,前者用来生成一批新的并发单元,后者用来等待现有的并发单元全部完工。python提供了很多种实现fan-out与fan-in的方案。