python并发与并行(七) ———— 如果必须用线程做并发,那就考虑通过ThreadPoolExecutor实现
Published:
Python有个内置模块叫作concurrent.futures,它提供了ThreadPoolExecutor类。这个类结合了线程(Thread)方案与队列(Queue)方案的优势,可以用来平行地处理康威生命游戏里的那种I/O操作(参见前面讲的线程方案和队列方案)。
我们把之前的代码拿过来进行更改。
# Example 1
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
from threading import Lock
class LockingGrid(Grid):
def __init__(self, height, width):
super().__init__(height, width)
self.lock = Lock()
def __str__(self):
with self.lock:
return super().__str__()
def get(self, y, x):
with self.lock:
return super().get(y, x)
def set(self, y, x, state):
with self.lock:
return super().set(y, x, state)
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
# def game_logic(state, neighbors):
# # Do some blocking input/output in here:
# data = my_socket.recv(100)
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)
这次在把游戏推进到下一代的时候,我们不针对每个单元格启动新的Thread实例,而是把推进每个单元格状态的那个函数与必要的参数提交给ThreadPoolExecutor,让执行器自己安排线程去执行这些状态更新任务,这样就实现了fan-out(分派)。稍后,可以等待提交过去的所有任务都执行完毕,然后再把整张网格正式推进到下一代,这样就实现了fan-in(归集)。
from concurrent.futures import ThreadPoolExecutor
def simulate_pool(pool, grid):
next_grid = LockingGrid(grid.height, grid.width)
futures = []
for y in range(grid.height):
for x in range(grid.width):
args = (y, x, grid.get, next_grid.set)
future = pool.submit(step_cell, *args) # Fan out
futures.append(future)
for future in futures:
future.result() # Fan in
return next_grid
用来推进游戏状态的这些线程可以提前分配,不用每次执行simulate_pool都分配一遍,这样能够降低启动线程的开销。另外,线程池里的最大线程数可以通过max_workers参数手工指定,这样能把线程数量限制在一定范围内,而不像最早的那个方案那样,每执行一项I/O操作,就启动一条线程,那样会导致内存用量激增。
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
with ThreadPoolExecutor(max_workers=10) as pool:
for i in range(5):
columns.append(str(grid))
grid = simulate_pool(pool, grid)
print(columns)
ThreadPoolExecutor类的最大优点在于:如果调用者通过submit方法把某项任务提交给它执行,那么会获得一个与该任务相对应的Future实例,当调用者在这个实例上通过result方法获取执行结果时,ThreadPoolExecutor会把它在执行任务的过程中所遇到的异常自动抛给调用者。
ThreadPoolExecutor方案仍然有个很大的缺点,就是I/O并行能力不高,即便把max_workers设成100,也无法高效地应对那种有一万多个单元格,且每个单元格都要同时做I/O的情况。如果你面对的需求,没办法用异步方案解决,而是必须执行完才能往后走(例如文件I/O),那么ThreadPoolExecutor是个不错的选择。
我们同样按照之前的修改,把Gird size改为(500,900),迭代次数100次,线程数设置为5,结果为:
2%|█▍ | 2/100 [00:17<14:34, 8.92s/it]
比之前的结果都要离谱。
我们总结一下之前使用方法的性能:
# | 配置
性能(s) | 单线程 | 线程方案实现多线程 | 队列方案实现多线程 | ThreadPoolExecutor方案实现多线程 | |————————————-|——————–|——————-|——————–|—————————| | Grid(500,900)
step 100次
线程数 5 | 55.45792198181152 | | 170.44810271263123 | 6410.6107659339905 | | Grid(50,90)
step 100次
线程数 5 | 0.6782510280609131 | 14.29249095916748 | 2.5422348976135254 | 5.175195217132568 |
可以看到ThreadPoolExcutoThreadPoolThreadPoolExcutor的并发性能并不高。