python并发与并行(五.2) ———— 不要在每次fan-out时都新建一批Thread实例

25 minute read

Published:

想在Python里平行地做I/O,首先要考虑的工具当然是线程。但如果真用线程来表示fan-out模式中的执行路径,你就会发现,这样其实有很多问题。

我们用上一篇文章的conway game的实现来举例说明。

我们用线程来解决game_logic函数由于执行I/O而产生的延迟问题。首先,这些线程之间需要锁定功能对其进行协调,以确保它们所操纵的数据结构不会遭到破坏。下面创建Grid子类,并为它添加锁定功能,让多条线程能够正确地访问同一个实例。

下面创建Grid子类,并为它添加锁定功能,让多条线程能够正确地访问同一个实例。


from threading import Lock

ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)

接下来,改用fan-out模式实现simulate函数,为每个step_cell操作都创建一条线程,这些线程能够平行地运行并各自执行相应的I/O任务。这样的话,我们就不需要像原来那样,非得等前一次step_cell执行完,才能更新下一个单元格。然后,通过fan-in模式等待这些线程全部完工,再把网格正式演化到下一代。

from threading import Thread

def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

def game_logic(state, neighbors):
    # Do some blocking input/output in here:
    data = my_socket.recv(100)

# 这里为了验证多线程,我们仍使用单线程的判断逻辑来做。
def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

def simulate_threaded(grid):
    next_grid = LockingGrid(grid.height, grid.width)

    threads = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            thread = Thread(target=step_cell, args=args)
            thread.start()  # Fan out
            threads.append(thread)

    for thread in threads:
        thread.join()       # Fan in

    return next_grid

负责推进单元格状态的step_cell函数可以保持原样,推动整个游戏流程的那些代码基本上也不用改,只有两个地方必须调整:一个是把网格类的名称改为LockingGrid,另一个是把simulate函数改为多线程版本的simulate_threaded。

class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

grid = LockingGrid(5, 9)            # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate_threaded(grid)  # Changed

print(columns)

完整的代码如下:

from threading import Lock

ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)


# Example 2
from threading import Thread

def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

# def game_logic(state, neighbors):
#     # Do some blocking input/output in here:
#     data = my_socket.recv(100)

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

def simulate_threaded(grid):
    next_grid = LockingGrid(grid.height, grid.width)

    threads = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            thread = Thread(target=step_cell, args=args)
            thread.start()  # Fan out
            threads.append(thread)

    for thread in threads:
        thread.join()       # Fan in

    return next_grid


# Example 3
class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

grid = LockingGrid(5, 9)            # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
import time
st=time.time()
for i in range(5):
    columns.append(str(grid))
    grid = simulate_threaded(grid)  # Changed
print(f'simulation time: {time.time() - st}')
print(columns)

Output:

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

这样写没错,而且I/O操作确实能够平行地在各线程中执行。但是,这种方案有三个比较大的缺点。

第一,要专门采用工具来协调这些Thread实例,才能保证它们不会破坏程序中的数据。这会让多线程版本的代码理解起来比较困难,因为早前的单线程版是按照先后顺序实现的,不需要有专门的协调机制,所以读起来比现在的版本好懂。这种复杂的多线程代码,随时间的推移更加难以扩展与维护。

第二,线程占用的内存比较多,每条线程大约占8MB。当然,现在的许多电脑完全能承担起本例中45条线程所占用的内存总量。但如果游戏里有一万个单元格,那就必须创建一万条线程,这个内存量恐怕很多电脑不支持。所以,给每项操作都新开一条线程是不现实的。

第三,线程的开销比较大。系统频繁地切换线程,会降低程序的运行效率。对于本例来说,每推进一代,就要停止一批旧线程并启动一批新线程,这样开销很大,于是程序除了要花100毫秒等待I/O操作结束,还会因为停止并启动线程而耽误一些时间。

每次都手工创建一批线程,是有很多缺点的,例如:创建并运行大量线程时的开销比较大,每条线程的内存占用量比较多,而且还必须采用Lock等机制来协调这些线程。

下面我们看一下把grid的尺寸变大,同时把迭代次数变多,多线程和上一节的单线程的时间开销对比。

我们统一把grid的大小设置为(50,90)。

# 多线程版本
grid = LockingGrid(50, 90)            # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
# 单线程版本
grid = Grid(50, 90)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)

同时迭代次数设置为100次。

# 多线程版本
for i in range(100):
    columns.append(str(grid))
    grid = simulate_threaded(grid)  # Changed
# 单线程版本 
for i in range(100):
    columns.append(str(grid))
    grid = simulate(grid)

完整的代码如下:

多线程版本

from threading import Lock

ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)


# Example 2
from threading import Thread

def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

# def game_logic(state, neighbors):
#     # Do some blocking input/output in here:
#     data = my_socket.recv(100)

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

def simulate_threaded(grid):
    next_grid = LockingGrid(grid.height, grid.width)

    threads = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            thread = Thread(target=step_cell, args=args)
            thread.start()  # Fan out
            threads.append(thread)

    for thread in threads:
        thread.join()       # Fan in

    return next_grid


# Example 3
class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

grid = LockingGrid(50, 90)            # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
import time
st=time.time()
for i in range(100):
    columns.append(str(grid))
    grid = simulate_threaded(grid)  # Changed
print(f'simulation time: {time.time() - st}')
print(columns)

单线程版本


ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output


grid = Grid(50, 90)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)


def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

alive = {(9, 5), (9, 6)}
seen = set()

def fake_get(y, x):
    position = (y, x)
    seen.add(position)
    return ALIVE if position in alive else EMPTY

count = count_neighbors(10, 5, fake_get)
assert count == 2

expected_seen = {
    (9, 5),  (9, 6),  (10, 6), (11, 6),
    (11, 5), (11, 4), (10, 4), (9, 4)
}
assert seen == expected_seen


def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

assert game_logic(ALIVE, 0) == EMPTY
assert game_logic(ALIVE, 1) == EMPTY
assert game_logic(ALIVE, 2) == ALIVE
assert game_logic(ALIVE, 3) == ALIVE
assert game_logic(ALIVE, 4) == EMPTY
assert game_logic(EMPTY, 0) == EMPTY
assert game_logic(EMPTY, 1) == EMPTY
assert game_logic(EMPTY, 2) == EMPTY
assert game_logic(EMPTY, 3) == ALIVE
assert game_logic(EMPTY, 4) == EMPTY


def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

alive = {(10, 5), (9, 5), (9, 6)}
new_state = None

def fake_get(y, x):
    return ALIVE if (y, x) in alive else EMPTY

def fake_set(y, x, state):
    global new_state
    new_state = state

# Stay alive
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE

# Stay dead
alive.remove((10, 5))
step_cell(10, 5, fake_get, fake_set)
assert new_state == EMPTY

# Regenerate
alive.add((10, 6))
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE


def simulate(grid):
    next_grid = Grid(grid.height, grid.width)
    for y in range(grid.height):
        for x in range(grid.width):
            step_cell(y, x, grid.get, next_grid.set)
    return next_grid


class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

columns = ColumnPrinter()
import time
st=time.time()
for i in range(100):
    columns.append(str(grid))
    grid = simulate(grid)
print(f'simulate time: {time.time() - st}')
print(columns)

我们启动程序,看两个程序的耗时:

多线程 simulation time: 14.29249095916748 单线程 simulate time: 0.6782510280609131

可以看到多线程的耗时是单线程版本的21倍。

我们对多线程的代码使用cProfile进行性能的profiling看一下。同时也对单线程的做profiling看一下。

对代码做一些修改:

import cProfile
import pstats

def run_simulation():
    grid = LockingGrid(50, 90)            # Changed
    grid.set(0, 3, ALIVE)
    grid.set(1, 4, ALIVE)
    grid.set(2, 2, ALIVE)
    grid.set(2, 3, ALIVE)
    grid.set(2, 4, ALIVE)

    columns = ColumnPrinter()
    import time
    st=time.time()
    for i in range(100):
        columns.append(str(grid))
        grid = simulate_threaded(grid)  # Changed
    print(f'simulation time: {time.time() - st}')
    print(columns)

cProfile.run('run_simulation()','threaded_prof')
with open('threaded_profile.txt', 'w') as f:
    p = pstats.Stats('threaded_prof', stream=f)
    p.sort_stats('cumulative').print_stats()

print("Profile saved to 'threaded_profile.txt'")

完整的代码如下:

from threading import Lock
import cProfile
import pstats

ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)


# Example 2
from threading import Thread

def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

# def game_logic(state, neighbors):
#     # Do some blocking input/output in here:
#     data = my_socket.recv(100)

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

def simulate_threaded(grid):
    next_grid = LockingGrid(grid.height, grid.width)

    threads = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            thread = Thread(target=step_cell, args=args)
            thread.start()  # Fan out
            threads.append(thread)

    for thread in threads:
        thread.join()       # Fan in

    return next_grid


# Example 3
class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

def run_simulation():
    grid = LockingGrid(50, 90)            # Changed
    grid.set(0, 3, ALIVE)
    grid.set(1, 4, ALIVE)
    grid.set(2, 2, ALIVE)
    grid.set(2, 3, ALIVE)
    grid.set(2, 4, ALIVE)

    columns = ColumnPrinter()
    import time
    st=time.time()
    for i in range(100):
        columns.append(str(grid))
        grid = simulate_threaded(grid)  # Changed
    print(f'simulation time: {time.time() - st}')
    print(columns)

cProfile.run('run_simulation()','threaded_prof')
with open('threaded_profile.txt', 'w') as f:
    p = pstats.Stats('threaded_prof', stream=f)
    p.sort_stats('cumulative').print_stats()

print("Profile saved to 'threaded_profile.txt'")

多线程版本profiling的结果为:

Tue Jul  9 09:06:00 2024    threaded_prof

         22071779 function calls in 16.326 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   16.326   16.326 {built-in method builtins.exec}
        1    0.000    0.000   16.326   16.326 <string>:1(<module>)
        1    0.180    0.180   16.326   16.326 /Users/wenyan/projects2024/efficient_python/fan-out.py:134(run_simulation)
      100    0.534    0.005   16.037    0.160 /Users/wenyan/projects2024/efficient_python/fan-out.py:88(simulate_threaded)
   450000    0.469    0.000   12.055    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:873(start)
   450000    0.397    0.000    8.890    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:556(wait)
   450000    0.586    0.000    7.908    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:280(wait)
  2250000    7.031    0.000    7.031    0.000 {method 'acquire' of '_thread.lock' objects}
   450000    2.621    0.000    2.621    0.000 {built-in method _thread.start_new_thread}
   450000    1.053    0.000    2.570    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:795(__init__)
   450000    0.150    0.000    0.857    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1021(join)
   450000    0.198    0.000    0.798    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:521(__init__)
   450000    0.123    0.000    0.620    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1059(_wait_for_tstate_lock)
   450000    0.552    0.000    0.552    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:228(__init__)
   450000    0.437    0.000    0.475    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:256(__enter__)
   450000    0.193    0.000    0.442    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:985(_stop)
   450000    0.192    0.000    0.245    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/_weakrefset.py:82(add)
   450000    0.098    0.000    0.186    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:768(_maintain_shutdown_locks)
   900000    0.144    0.000    0.175    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1338(current_thread)
   450000    0.172    0.000    0.172    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1209(_make_invoke_excepthook)
   450000    0.081    0.000    0.132    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:268(_acquire_restore)
   450000    0.076    0.000    0.122    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:271(_is_owned)
   450000    0.114    0.000    0.114    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:750(_newname)
   450000    0.093    0.000    0.110    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:259(__exit__)
   450000    0.086    0.000    0.110    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:265(_release_save)
   900000    0.103    0.000    0.103    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1127(daemon)
   900101    0.090    0.000    0.090    0.000 {built-in method _thread.allocate_lock}
   450000    0.052    0.000    0.068    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/_weakrefset.py:39(_remove)
   900000    0.068    0.000    0.068    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:529(is_set)
   450000    0.049    0.000    0.060    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:778(<listcomp>)
   450000    0.053    0.000    0.053    0.000 {method 'add' of 'set' objects}
   900000    0.040    0.000    0.040    0.000 {method 'release' of '_thread.lock' objects}
   900105    0.039    0.000    0.039    0.000 {method '__exit__' of '_thread.lock' objects}
   450000    0.038    0.000    0.038    0.000 {method '__enter__' of '_thread.lock' objects}
   450000    0.036    0.000    0.036    0.000 {method '__exit__' of '_thread.RLock' objects}
   900000    0.031    0.000    0.031    0.000 {built-in method _thread.get_ident}
   450000    0.029    0.000    0.029    0.000 {method 'difference_update' of 'set' objects}
        2    0.006    0.003    0.024    0.012 {built-in method builtins.print}
   900100    0.022    0.000    0.022    0.000 {method 'locked' of '_thread.lock' objects}
   450000    0.022    0.000    0.022    0.000 {method 'append' of 'collections.deque' objects}
   455150    0.020    0.000    0.020    0.000 {method 'append' of 'list' objects}
        1    0.003    0.003    0.018    0.018 /Users/wenyan/projects2024/efficient_python/fan-out.py:113(__str__)
      100    0.000    0.000    0.017    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:35(__str__)
      100    0.017    0.000    0.017    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:22(__str__)
   450000    0.016    0.000    0.016    0.000 {method 'discard' of 'set' objects}
     5200    0.014    0.000    0.014    0.000 {method 'splitlines' of 'str' objects}
      101    0.000    0.000    0.001    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:31(__init__)
      101    0.001    0.000    0.001    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:9(__init__)
     5200    0.000    0.000    0.000    0.000 {built-in method builtins.max}
     5300    0.000    0.000    0.000    0.000 {built-in method builtins.len}
        1    0.000    0.000    0.000    0.000 {method 'join' of 'str' objects}
      100    0.000    0.000    0.000    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:110(append)
        5    0.000    0.000    0.000    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:43(set)
        5    0.000    0.000    0.000    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:19(set)
        2    0.000    0.000    0.000    0.000 {built-in method time.time}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:107(__init__)



解读:

Tue Jul  9 09:06:00 2024    threaded_prof

         22071779 function calls in 16.326 seconds

   Ordered by: cumulative time

这部分提供了文件生成的时间、总共调用了多少次函数、程序执行的总时间,以及结果的排序方式(这里是按累积时间排序的)。

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)

ncalls: 函数被调用的次数。

tottime: 在该函数中花费的总时间,不包括调用其他函数的时间。

percall: 平均每次调用该函数花费的时间,即 tottime/ncalls。

cumtime: 在该函数及其所有子函数中花费的总时间。

percall: 平均每次调用该函数及其所有子函数花费的时间,即 cumtime/ncalls。

filename(function): 函数定义所在的文件名和行号,以及函数名。

下面的是主模块和执行环境

        1    0.000    0.000   16.326   16.326 {built-in method builtins.exec}
        1    0.000    0.000   16.326   16.326 <string>:1(<module>)
        1    0.180    0.180   16.326   16.326 /Users/wenyan/projects2024/efficient_python/fan-out.py:134(run_simulation)

这些行表示执行的主模块和调用链。run_simulation 函数总耗时 16.326 秒。

下面的是主要函数

      100    0.534    0.005   16.037    0.160 /Users/wenyan/projects2024/efficient_python/fan-out.py:88(simulate_threaded)

simulate_threaded 函数被调用 100 次,总耗时 16.037 秒,平均每次调用 0.160 秒。 被调用100次,与我们代码的100次的for循环可以对应起来。

下面的是线程管理部分

   450000    0.469    0.000   12.055    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:873(start)
   450000    0.397    0.000    8.890    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:556(wait)
   450000    0.586    0.000    7.908    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:280(wait)
  2250000    7.031    0.000    7.031    0.000 {method 'acquire' of '_thread.lock' objects}

这些行显示线程的启动和等待占用了大量时间。start 和 wait 方法分别总耗时 12.055 秒和 8.890 秒,而线程锁的获取 (acquire 方法) 总耗时 7.031 秒。

线程函数

   450000    2.621    0.000    2.621    0.000 {built-in method _thread.start_new_thread}
   450000    1.053    0.000    2.570    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:795(__init__)

线程启动的新线程占用了 2.621 秒,线程的初始化 (init 方法) 总耗时 2.570 秒。

锁和状态管理

   450000    0.150    0.000    0.857    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1021(join)
   450000    0.198    0.000    0.798    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:521(__init__)

线程的 join 方法(等待线程结束)耗时 0.857 秒,锁的初始化耗时 0.798 秒。

辅助函数

   900000    0.144    0.000    0.175    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1338(current_thread)
   450000    0.172    0.000    0.172    0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1209(_make_invoke_excepthook)

一些辅助函数如 current_thread 和 _make_invoke_excepthook 的调用次数和耗时较少。

总结:

性能分析结果显示,线程的启动和管理占用了大量时间,特别是在锁的获取和等待上。这是程序比单线程慢的主要原因。要优化多线程程序,可以考虑以下几种方法:

减少线程的启动和管理: 尽量减少线程的创建次数,使用线程池等方法来管理线程。 优化锁的使用: 尽量减少锁的争用和等待时间,使用更高效的锁机制。 减少 I/O 操作: 如果程序有大量的 I/O 操作,考虑使用异步 I/O 来提高性能。

我们把单线程的程序同样使用cProfile进行修改,然后查看profiling的结果。

import cProfile
import pstats

ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output


grid = Grid(50, 90)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)


def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

alive = {(9, 5), (9, 6)}
seen = set()

def fake_get(y, x):
    position = (y, x)
    seen.add(position)
    return ALIVE if position in alive else EMPTY

count = count_neighbors(10, 5, fake_get)
assert count == 2

expected_seen = {
    (9, 5),  (9, 6),  (10, 6), (11, 6),
    (11, 5), (11, 4), (10, 4), (9, 4)
}
assert seen == expected_seen


def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state

assert game_logic(ALIVE, 0) == EMPTY
assert game_logic(ALIVE, 1) == EMPTY
assert game_logic(ALIVE, 2) == ALIVE
assert game_logic(ALIVE, 3) == ALIVE
assert game_logic(ALIVE, 4) == EMPTY
assert game_logic(EMPTY, 0) == EMPTY
assert game_logic(EMPTY, 1) == EMPTY
assert game_logic(EMPTY, 2) == EMPTY
assert game_logic(EMPTY, 3) == ALIVE
assert game_logic(EMPTY, 4) == EMPTY


def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

alive = {(10, 5), (9, 5), (9, 6)}
new_state = None

def fake_get(y, x):
    return ALIVE if (y, x) in alive else EMPTY

def fake_set(y, x, state):
    global new_state
    new_state = state

# Stay alive
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE

# Stay dead
alive.remove((10, 5))
step_cell(10, 5, fake_get, fake_set)
assert new_state == EMPTY

# Regenerate
alive.add((10, 6))
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE


def simulate(grid):
    next_grid = Grid(grid.height, grid.width)
    for y in range(grid.height):
        for x in range(grid.width):
            step_cell(y, x, grid.get, next_grid.set)
    return next_grid


class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

def run_simulation():
    grid = Grid(50, 90)
    grid.set(0, 3, ALIVE)
    grid.set(1, 4, ALIVE)
    grid.set(2, 2, ALIVE)
    grid.set(2, 3, ALIVE)
    grid.set(2, 4, ALIVE)
    columns = ColumnPrinter()
    import time
    st=time.time()
    for i in range(100):
        columns.append(str(grid))
        grid = simulate(grid)
    print(f'simulate time: {time.time() - st}')
    print(columns)

cProfile.run('run_simulation()','threaded_prof_single_thread')
with open('threaded_profile_single_thread.txt', 'w') as f:
    p = pstats.Stats('threaded_prof_single_thread', stream=f)
    p.sort_stats('cumulative').print_stats()

print("Profile saved to 'threaded_profile_single_thread.txt'")

结果为:

Tue Jul  9 23:53:23 2024    threaded_prof_single_thread

         5871267 function calls in 1.004 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    1.004    1.004 {built-in method builtins.exec}
        1    0.000    0.000    1.004    1.004 <string>:1(<module>)
        1    0.001    0.001    1.004    1.004 /Users/wenyan/projects2024/efficient_python/conway_game.py:163(run_simulation)
      100    0.055    0.001    0.963    0.010 /Users/wenyan/projects2024/efficient_python/conway_game.py:127(simulate)
   450000    0.149    0.000    0.907    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:96(step_cell)
   450000    0.359    0.000    0.648    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:39(count_neighbors)
  4050000    0.328    0.000    0.328    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:15(get)
   450005    0.046    0.000    0.046    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:18(set)
   450000    0.024    0.000    0.024    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:73(game_logic)
        2    0.005    0.003    0.023    0.012 {built-in method builtins.print}
        1    0.003    0.003    0.018    0.018 /Users/wenyan/projects2024/efficient_python/conway_game.py:142(__str__)
      100    0.016    0.000    0.016    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:21(__str__)
     5200    0.014    0.000    0.014    0.000 {method 'splitlines' of 'str' objects}
      101    0.001    0.000    0.001    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:8(__init__)
     5200    0.000    0.000    0.000    0.000 {built-in method builtins.max}
     5150    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
     5300    0.000    0.000    0.000    0.000 {built-in method builtins.len}
      100    0.000    0.000    0.000    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:139(append)
        1    0.000    0.000    0.000    0.000 {method 'join' of 'str' objects}
        2    0.000    0.000    0.000    0.000 {built-in method time.time}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:136(__init__)

可以看到单线程的之所以快,是因为没有线程管理的开销。

所以再回顾一下这一节的总结:

目前的多线程实现有比较大的缺点。

第一,要专门采用工具来协调这些Thread实例,才能保证它们不会破坏程序中的数据。这会让多线程版本的代码理解起来比较困难,因为早前的单线程版是按照先后顺序实现的,不需要有专门的协调机制,所以读起来比现在的版本好懂。这种复杂的多线程代码,随时间的推移更加难以扩展与维护。

第二,线程占用的内存比较多,每条线程大约占8MB。当然,现在的许多电脑完全能承担起本例中45条线程所占用的内存总量。但如果游戏里有一万个单元格,那就必须创建一万条线程,这个内存量恐怕很多电脑不支持。所以,给每项操作都新开一条线程是不现实的。

第三,线程的开销比较大。系统频繁地切换线程,会降低程序的运行效率。对于本例来说,每推进一代,就要停止一批旧线程并启动一批新线程,这样开销很大,于是程序除了要花100毫秒等待I/O操作结束,还会因为停止并启动线程而耽误一些时间。

每次都手工创建一批线程,是有很多缺点的,例如:创建并运行大量线程时的开销比较大,每条线程的内存占用量比较多,而且还必须采用Lock等机制来协调这些线程。