python并发与并行(五.2) ———— 不要在每次fan-out时都新建一批Thread实例
Published:
想在Python里平行地做I/O,首先要考虑的工具当然是线程。但如果真用线程来表示fan-out模式中的执行路径,你就会发现,这样其实有很多问题。
我们用上一篇文章的conway game的实现来举例说明。
我们用线程来解决game_logic函数由于执行I/O而产生的延迟问题。首先,这些线程之间需要锁定功能对其进行协调,以确保它们所操纵的数据结构不会遭到破坏。下面创建Grid子类,并为它添加锁定功能,让多条线程能够正确地访问同一个实例。
下面创建Grid子类,并为它添加锁定功能,让多条线程能够正确地访问同一个实例。
from threading import Lock
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
class LockingGrid(Grid):
def __init__(self, height, width):
super().__init__(height, width)
self.lock = Lock()
def __str__(self):
with self.lock:
return super().__str__()
def get(self, y, x):
with self.lock:
return super().get(y, x)
def set(self, y, x, state):
with self.lock:
return super().set(y, x, state)
接下来,改用fan-out模式实现simulate函数,为每个step_cell操作都创建一条线程,这些线程能够平行地运行并各自执行相应的I/O任务。这样的话,我们就不需要像原来那样,非得等前一次step_cell执行完,才能更新下一个单元格。然后,通过fan-in模式等待这些线程全部完工,再把网格正式演化到下一代。
from threading import Thread
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
def game_logic(state, neighbors):
# Do some blocking input/output in here:
data = my_socket.recv(100)
# 这里为了验证多线程,我们仍使用单线程的判断逻辑来做。
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)
def simulate_threaded(grid):
next_grid = LockingGrid(grid.height, grid.width)
threads = []
for y in range(grid.height):
for x in range(grid.width):
args = (y, x, grid.get, next_grid.set)
thread = Thread(target=step_cell, args=args)
thread.start() # Fan out
threads.append(thread)
for thread in threads:
thread.join() # Fan in
return next_grid
负责推进单元格状态的step_cell函数可以保持原样,推动整个游戏流程的那些代码基本上也不用改,只有两个地方必须调整:一个是把网格类的名称改为LockingGrid,另一个是把simulate函数改为多线程版本的simulate_threaded。
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
grid = LockingGrid(5, 9) # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
for i in range(5):
columns.append(str(grid))
grid = simulate_threaded(grid) # Changed
print(columns)
完整的代码如下:
from threading import Lock
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
class LockingGrid(Grid):
def __init__(self, height, width):
super().__init__(height, width)
self.lock = Lock()
def __str__(self):
with self.lock:
return super().__str__()
def get(self, y, x):
with self.lock:
return super().get(y, x)
def set(self, y, x, state):
with self.lock:
return super().set(y, x, state)
# Example 2
from threading import Thread
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
# def game_logic(state, neighbors):
# # Do some blocking input/output in here:
# data = my_socket.recv(100)
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)
def simulate_threaded(grid):
next_grid = LockingGrid(grid.height, grid.width)
threads = []
for y in range(grid.height):
for x in range(grid.width):
args = (y, x, grid.get, next_grid.set)
thread = Thread(target=step_cell, args=args)
thread.start() # Fan out
threads.append(thread)
for thread in threads:
thread.join() # Fan in
return next_grid
# Example 3
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
grid = LockingGrid(5, 9) # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
import time
st=time.time()
for i in range(5):
columns.append(str(grid))
grid = simulate_threaded(grid) # Changed
print(f'simulation time: {time.time() - st}')
print(columns)
Output:
0 | 1 | 2 | 3 | 4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------
这样写没错,而且I/O操作确实能够平行地在各线程中执行。但是,这种方案有三个比较大的缺点。
第一,要专门采用工具来协调这些Thread实例,才能保证它们不会破坏程序中的数据。这会让多线程版本的代码理解起来比较困难,因为早前的单线程版是按照先后顺序实现的,不需要有专门的协调机制,所以读起来比现在的版本好懂。这种复杂的多线程代码,随时间的推移更加难以扩展与维护。
第二,线程占用的内存比较多,每条线程大约占8MB。当然,现在的许多电脑完全能承担起本例中45条线程所占用的内存总量。但如果游戏里有一万个单元格,那就必须创建一万条线程,这个内存量恐怕很多电脑不支持。所以,给每项操作都新开一条线程是不现实的。
第三,线程的开销比较大。系统频繁地切换线程,会降低程序的运行效率。对于本例来说,每推进一代,就要停止一批旧线程并启动一批新线程,这样开销很大,于是程序除了要花100毫秒等待I/O操作结束,还会因为停止并启动线程而耽误一些时间。
每次都手工创建一批线程,是有很多缺点的,例如:创建并运行大量线程时的开销比较大,每条线程的内存占用量比较多,而且还必须采用Lock等机制来协调这些线程。
下面我们看一下把grid的尺寸变大,同时把迭代次数变多,多线程和上一节的单线程的时间开销对比。
我们统一把grid的大小设置为(50,90)。
# 多线程版本
grid = LockingGrid(50, 90) # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
# 单线程版本
grid = Grid(50, 90)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)
同时迭代次数设置为100次。
# 多线程版本
for i in range(100):
columns.append(str(grid))
grid = simulate_threaded(grid) # Changed
# 单线程版本
for i in range(100):
columns.append(str(grid))
grid = simulate(grid)
完整的代码如下:
多线程版本
from threading import Lock
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
class LockingGrid(Grid):
def __init__(self, height, width):
super().__init__(height, width)
self.lock = Lock()
def __str__(self):
with self.lock:
return super().__str__()
def get(self, y, x):
with self.lock:
return super().get(y, x)
def set(self, y, x, state):
with self.lock:
return super().set(y, x, state)
# Example 2
from threading import Thread
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
# def game_logic(state, neighbors):
# # Do some blocking input/output in here:
# data = my_socket.recv(100)
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)
def simulate_threaded(grid):
next_grid = LockingGrid(grid.height, grid.width)
threads = []
for y in range(grid.height):
for x in range(grid.width):
args = (y, x, grid.get, next_grid.set)
thread = Thread(target=step_cell, args=args)
thread.start() # Fan out
threads.append(thread)
for thread in threads:
thread.join() # Fan in
return next_grid
# Example 3
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
grid = LockingGrid(50, 90) # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
import time
st=time.time()
for i in range(100):
columns.append(str(grid))
grid = simulate_threaded(grid) # Changed
print(f'simulation time: {time.time() - st}')
print(columns)
单线程版本
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
grid = Grid(50, 90)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
alive = {(9, 5), (9, 6)}
seen = set()
def fake_get(y, x):
position = (y, x)
seen.add(position)
return ALIVE if position in alive else EMPTY
count = count_neighbors(10, 5, fake_get)
assert count == 2
expected_seen = {
(9, 5), (9, 6), (10, 6), (11, 6),
(11, 5), (11, 4), (10, 4), (9, 4)
}
assert seen == expected_seen
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
assert game_logic(ALIVE, 0) == EMPTY
assert game_logic(ALIVE, 1) == EMPTY
assert game_logic(ALIVE, 2) == ALIVE
assert game_logic(ALIVE, 3) == ALIVE
assert game_logic(ALIVE, 4) == EMPTY
assert game_logic(EMPTY, 0) == EMPTY
assert game_logic(EMPTY, 1) == EMPTY
assert game_logic(EMPTY, 2) == EMPTY
assert game_logic(EMPTY, 3) == ALIVE
assert game_logic(EMPTY, 4) == EMPTY
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)
alive = {(10, 5), (9, 5), (9, 6)}
new_state = None
def fake_get(y, x):
return ALIVE if (y, x) in alive else EMPTY
def fake_set(y, x, state):
global new_state
new_state = state
# Stay alive
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE
# Stay dead
alive.remove((10, 5))
step_cell(10, 5, fake_get, fake_set)
assert new_state == EMPTY
# Regenerate
alive.add((10, 6))
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE
def simulate(grid):
next_grid = Grid(grid.height, grid.width)
for y in range(grid.height):
for x in range(grid.width):
step_cell(y, x, grid.get, next_grid.set)
return next_grid
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
columns = ColumnPrinter()
import time
st=time.time()
for i in range(100):
columns.append(str(grid))
grid = simulate(grid)
print(f'simulate time: {time.time() - st}')
print(columns)
我们启动程序,看两个程序的耗时:
多线程 simulation time: 14.29249095916748 单线程 simulate time: 0.6782510280609131
可以看到多线程的耗时是单线程版本的21倍。
我们对多线程的代码使用cProfile进行性能的profiling看一下。同时也对单线程的做profiling看一下。
对代码做一些修改:
import cProfile
import pstats
def run_simulation():
grid = LockingGrid(50, 90) # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
import time
st=time.time()
for i in range(100):
columns.append(str(grid))
grid = simulate_threaded(grid) # Changed
print(f'simulation time: {time.time() - st}')
print(columns)
cProfile.run('run_simulation()','threaded_prof')
with open('threaded_profile.txt', 'w') as f:
p = pstats.Stats('threaded_prof', stream=f)
p.sort_stats('cumulative').print_stats()
print("Profile saved to 'threaded_profile.txt'")
完整的代码如下:
from threading import Lock
import cProfile
import pstats
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
class LockingGrid(Grid):
def __init__(self, height, width):
super().__init__(height, width)
self.lock = Lock()
def __str__(self):
with self.lock:
return super().__str__()
def get(self, y, x):
with self.lock:
return super().get(y, x)
def set(self, y, x, state):
with self.lock:
return super().set(y, x, state)
# Example 2
from threading import Thread
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
# def game_logic(state, neighbors):
# # Do some blocking input/output in here:
# data = my_socket.recv(100)
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)
def simulate_threaded(grid):
next_grid = LockingGrid(grid.height, grid.width)
threads = []
for y in range(grid.height):
for x in range(grid.width):
args = (y, x, grid.get, next_grid.set)
thread = Thread(target=step_cell, args=args)
thread.start() # Fan out
threads.append(thread)
for thread in threads:
thread.join() # Fan in
return next_grid
# Example 3
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
def run_simulation():
grid = LockingGrid(50, 90) # Changed
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
import time
st=time.time()
for i in range(100):
columns.append(str(grid))
grid = simulate_threaded(grid) # Changed
print(f'simulation time: {time.time() - st}')
print(columns)
cProfile.run('run_simulation()','threaded_prof')
with open('threaded_profile.txt', 'w') as f:
p = pstats.Stats('threaded_prof', stream=f)
p.sort_stats('cumulative').print_stats()
print("Profile saved to 'threaded_profile.txt'")
多线程版本profiling的结果为:
Tue Jul 9 09:06:00 2024 threaded_prof
22071779 function calls in 16.326 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 16.326 16.326 {built-in method builtins.exec}
1 0.000 0.000 16.326 16.326 <string>:1(<module>)
1 0.180 0.180 16.326 16.326 /Users/wenyan/projects2024/efficient_python/fan-out.py:134(run_simulation)
100 0.534 0.005 16.037 0.160 /Users/wenyan/projects2024/efficient_python/fan-out.py:88(simulate_threaded)
450000 0.469 0.000 12.055 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:873(start)
450000 0.397 0.000 8.890 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:556(wait)
450000 0.586 0.000 7.908 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:280(wait)
2250000 7.031 0.000 7.031 0.000 {method 'acquire' of '_thread.lock' objects}
450000 2.621 0.000 2.621 0.000 {built-in method _thread.start_new_thread}
450000 1.053 0.000 2.570 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:795(__init__)
450000 0.150 0.000 0.857 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1021(join)
450000 0.198 0.000 0.798 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:521(__init__)
450000 0.123 0.000 0.620 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1059(_wait_for_tstate_lock)
450000 0.552 0.000 0.552 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:228(__init__)
450000 0.437 0.000 0.475 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:256(__enter__)
450000 0.193 0.000 0.442 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:985(_stop)
450000 0.192 0.000 0.245 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/_weakrefset.py:82(add)
450000 0.098 0.000 0.186 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:768(_maintain_shutdown_locks)
900000 0.144 0.000 0.175 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1338(current_thread)
450000 0.172 0.000 0.172 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1209(_make_invoke_excepthook)
450000 0.081 0.000 0.132 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:268(_acquire_restore)
450000 0.076 0.000 0.122 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:271(_is_owned)
450000 0.114 0.000 0.114 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:750(_newname)
450000 0.093 0.000 0.110 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:259(__exit__)
450000 0.086 0.000 0.110 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:265(_release_save)
900000 0.103 0.000 0.103 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1127(daemon)
900101 0.090 0.000 0.090 0.000 {built-in method _thread.allocate_lock}
450000 0.052 0.000 0.068 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/_weakrefset.py:39(_remove)
900000 0.068 0.000 0.068 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:529(is_set)
450000 0.049 0.000 0.060 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:778(<listcomp>)
450000 0.053 0.000 0.053 0.000 {method 'add' of 'set' objects}
900000 0.040 0.000 0.040 0.000 {method 'release' of '_thread.lock' objects}
900105 0.039 0.000 0.039 0.000 {method '__exit__' of '_thread.lock' objects}
450000 0.038 0.000 0.038 0.000 {method '__enter__' of '_thread.lock' objects}
450000 0.036 0.000 0.036 0.000 {method '__exit__' of '_thread.RLock' objects}
900000 0.031 0.000 0.031 0.000 {built-in method _thread.get_ident}
450000 0.029 0.000 0.029 0.000 {method 'difference_update' of 'set' objects}
2 0.006 0.003 0.024 0.012 {built-in method builtins.print}
900100 0.022 0.000 0.022 0.000 {method 'locked' of '_thread.lock' objects}
450000 0.022 0.000 0.022 0.000 {method 'append' of 'collections.deque' objects}
455150 0.020 0.000 0.020 0.000 {method 'append' of 'list' objects}
1 0.003 0.003 0.018 0.018 /Users/wenyan/projects2024/efficient_python/fan-out.py:113(__str__)
100 0.000 0.000 0.017 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:35(__str__)
100 0.017 0.000 0.017 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:22(__str__)
450000 0.016 0.000 0.016 0.000 {method 'discard' of 'set' objects}
5200 0.014 0.000 0.014 0.000 {method 'splitlines' of 'str' objects}
101 0.000 0.000 0.001 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:31(__init__)
101 0.001 0.000 0.001 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:9(__init__)
5200 0.000 0.000 0.000 0.000 {built-in method builtins.max}
5300 0.000 0.000 0.000 0.000 {built-in method builtins.len}
1 0.000 0.000 0.000 0.000 {method 'join' of 'str' objects}
100 0.000 0.000 0.000 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:110(append)
5 0.000 0.000 0.000 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:43(set)
5 0.000 0.000 0.000 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:19(set)
2 0.000 0.000 0.000 0.000 {built-in method time.time}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
1 0.000 0.000 0.000 0.000 /Users/wenyan/projects2024/efficient_python/fan-out.py:107(__init__)
解读:
Tue Jul 9 09:06:00 2024 threaded_prof
22071779 function calls in 16.326 seconds
Ordered by: cumulative time
这部分提供了文件生成的时间、总共调用了多少次函数、程序执行的总时间,以及结果的排序方式(这里是按累积时间排序的)。
ncalls tottime percall cumtime percall filename:lineno(function)
ncalls: 函数被调用的次数。
tottime: 在该函数中花费的总时间,不包括调用其他函数的时间。
percall: 平均每次调用该函数花费的时间,即 tottime/ncalls。
cumtime: 在该函数及其所有子函数中花费的总时间。
percall: 平均每次调用该函数及其所有子函数花费的时间,即 cumtime/ncalls。
filename(function): 函数定义所在的文件名和行号,以及函数名。
下面的是主模块和执行环境
1 0.000 0.000 16.326 16.326 {built-in method builtins.exec}
1 0.000 0.000 16.326 16.326 <string>:1(<module>)
1 0.180 0.180 16.326 16.326 /Users/wenyan/projects2024/efficient_python/fan-out.py:134(run_simulation)
这些行表示执行的主模块和调用链。run_simulation 函数总耗时 16.326 秒。
下面的是主要函数
100 0.534 0.005 16.037 0.160 /Users/wenyan/projects2024/efficient_python/fan-out.py:88(simulate_threaded)
simulate_threaded 函数被调用 100 次,总耗时 16.037 秒,平均每次调用 0.160 秒。 被调用100次,与我们代码的100次的for循环可以对应起来。
下面的是线程管理部分
450000 0.469 0.000 12.055 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:873(start)
450000 0.397 0.000 8.890 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:556(wait)
450000 0.586 0.000 7.908 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:280(wait)
2250000 7.031 0.000 7.031 0.000 {method 'acquire' of '_thread.lock' objects}
这些行显示线程的启动和等待占用了大量时间。start 和 wait 方法分别总耗时 12.055 秒和 8.890 秒,而线程锁的获取 (acquire 方法) 总耗时 7.031 秒。
线程函数
450000 2.621 0.000 2.621 0.000 {built-in method _thread.start_new_thread}
450000 1.053 0.000 2.570 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:795(__init__)
线程启动的新线程占用了 2.621 秒,线程的初始化 (init 方法) 总耗时 2.570 秒。
锁和状态管理
450000 0.150 0.000 0.857 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1021(join)
450000 0.198 0.000 0.798 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:521(__init__)
线程的 join 方法(等待线程结束)耗时 0.857 秒,锁的初始化耗时 0.798 秒。
辅助函数
900000 0.144 0.000 0.175 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1338(current_thread)
450000 0.172 0.000 0.172 0.000 /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py:1209(_make_invoke_excepthook)
一些辅助函数如 current_thread 和 _make_invoke_excepthook 的调用次数和耗时较少。
总结:
性能分析结果显示,线程的启动和管理占用了大量时间,特别是在锁的获取和等待上。这是程序比单线程慢的主要原因。要优化多线程程序,可以考虑以下几种方法:
减少线程的启动和管理: 尽量减少线程的创建次数,使用线程池等方法来管理线程。 优化锁的使用: 尽量减少锁的争用和等待时间,使用更高效的锁机制。 减少 I/O 操作: 如果程序有大量的 I/O 操作,考虑使用异步 I/O 来提高性能。
我们把单线程的程序同样使用cProfile进行修改,然后查看profiling的结果。
import cProfile
import pstats
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
grid = Grid(50, 90)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
alive = {(9, 5), (9, 6)}
seen = set()
def fake_get(y, x):
position = (y, x)
seen.add(position)
return ALIVE if position in alive else EMPTY
count = count_neighbors(10, 5, fake_get)
assert count == 2
expected_seen = {
(9, 5), (9, 6), (10, 6), (11, 6),
(11, 5), (11, 4), (10, 4), (9, 4)
}
assert seen == expected_seen
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
assert game_logic(ALIVE, 0) == EMPTY
assert game_logic(ALIVE, 1) == EMPTY
assert game_logic(ALIVE, 2) == ALIVE
assert game_logic(ALIVE, 3) == ALIVE
assert game_logic(ALIVE, 4) == EMPTY
assert game_logic(EMPTY, 0) == EMPTY
assert game_logic(EMPTY, 1) == EMPTY
assert game_logic(EMPTY, 2) == EMPTY
assert game_logic(EMPTY, 3) == ALIVE
assert game_logic(EMPTY, 4) == EMPTY
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)
alive = {(10, 5), (9, 5), (9, 6)}
new_state = None
def fake_get(y, x):
return ALIVE if (y, x) in alive else EMPTY
def fake_set(y, x, state):
global new_state
new_state = state
# Stay alive
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE
# Stay dead
alive.remove((10, 5))
step_cell(10, 5, fake_get, fake_set)
assert new_state == EMPTY
# Regenerate
alive.add((10, 6))
step_cell(10, 5, fake_get, fake_set)
assert new_state == ALIVE
def simulate(grid):
next_grid = Grid(grid.height, grid.width)
for y in range(grid.height):
for x in range(grid.width):
step_cell(y, x, grid.get, next_grid.set)
return next_grid
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
def run_simulation():
grid = Grid(50, 90)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
import time
st=time.time()
for i in range(100):
columns.append(str(grid))
grid = simulate(grid)
print(f'simulate time: {time.time() - st}')
print(columns)
cProfile.run('run_simulation()','threaded_prof_single_thread')
with open('threaded_profile_single_thread.txt', 'w') as f:
p = pstats.Stats('threaded_prof_single_thread', stream=f)
p.sort_stats('cumulative').print_stats()
print("Profile saved to 'threaded_profile_single_thread.txt'")
结果为:
Tue Jul 9 23:53:23 2024 threaded_prof_single_thread
5871267 function calls in 1.004 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 1.004 1.004 {built-in method builtins.exec}
1 0.000 0.000 1.004 1.004 <string>:1(<module>)
1 0.001 0.001 1.004 1.004 /Users/wenyan/projects2024/efficient_python/conway_game.py:163(run_simulation)
100 0.055 0.001 0.963 0.010 /Users/wenyan/projects2024/efficient_python/conway_game.py:127(simulate)
450000 0.149 0.000 0.907 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:96(step_cell)
450000 0.359 0.000 0.648 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:39(count_neighbors)
4050000 0.328 0.000 0.328 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:15(get)
450005 0.046 0.000 0.046 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:18(set)
450000 0.024 0.000 0.024 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:73(game_logic)
2 0.005 0.003 0.023 0.012 {built-in method builtins.print}
1 0.003 0.003 0.018 0.018 /Users/wenyan/projects2024/efficient_python/conway_game.py:142(__str__)
100 0.016 0.000 0.016 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:21(__str__)
5200 0.014 0.000 0.014 0.000 {method 'splitlines' of 'str' objects}
101 0.001 0.000 0.001 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:8(__init__)
5200 0.000 0.000 0.000 0.000 {built-in method builtins.max}
5150 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
5300 0.000 0.000 0.000 0.000 {built-in method builtins.len}
100 0.000 0.000 0.000 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:139(append)
1 0.000 0.000 0.000 0.000 {method 'join' of 'str' objects}
2 0.000 0.000 0.000 0.000 {built-in method time.time}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
1 0.000 0.000 0.000 0.000 /Users/wenyan/projects2024/efficient_python/conway_game.py:136(__init__)
可以看到单线程的之所以快,是因为没有线程管理的开销。
所以再回顾一下这一节的总结:
目前的多线程实现有比较大的缺点。
第一,要专门采用工具来协调这些Thread实例,才能保证它们不会破坏程序中的数据。这会让多线程版本的代码理解起来比较困难,因为早前的单线程版是按照先后顺序实现的,不需要有专门的协调机制,所以读起来比现在的版本好懂。这种复杂的多线程代码,随时间的推移更加难以扩展与维护。
第二,线程占用的内存比较多,每条线程大约占8MB。当然,现在的许多电脑完全能承担起本例中45条线程所占用的内存总量。但如果游戏里有一万个单元格,那就必须创建一万条线程,这个内存量恐怕很多电脑不支持。所以,给每项操作都新开一条线程是不现实的。
第三,线程的开销比较大。系统频繁地切换线程,会降低程序的运行效率。对于本例来说,每推进一代,就要停止一批旧线程并启动一批新线程,这样开销很大,于是程序除了要花100毫秒等待I/O操作结束,还会因为停止并启动线程而耽误一些时间。
每次都手工创建一批线程,是有很多缺点的,例如:创建并运行大量线程时的开销比较大,每条线程的内存占用量比较多,而且还必须采用Lock等机制来协调这些线程。