python并发与并行(八) ———— 用协程实现高并发的I/O
Published:
在前面几条里,我们以生命游戏为例,试着用各种方案解决I/O并行问题,这些方案在某些情况下确实可行,但如果同时需要执行的I/O任务有成千上万个,那么这些方案的效率就不太理想了
像这种在并发方面要求比较高的I/O需求,可以用Python的协程(coroutine)来解决。协程能够制造出一种效果,让我们觉得Python程序好像真的可以同时执行大量任务。这种效果需要使用async与await关键字来实现,它的基本原理与生成器(generator)类似,也就是不立刻给出所有的结果,而是等需要用到的时候再一项一项地获取
启动协程是有代价的,就是必须做一次函数调用。协程激活之后,只占用不到1KB内存,所以只要内存足够,协程稍微多一些也没关系。与线程类似,协程所要执行的任务也是用一个函数来表示的,在执行这个函数的过程中,协程可以从执行环境里面获取输入值,并把输出结果放到这个执行环境之中。协程与线程的区别在于,它不会把这个函数从头到尾执行完,而是每遇到一个await表达式,就暂停一次,下次继续执行的时候,它会先等待await所针对的那项awaitable操作有了结果(那项操作是用async函数表示的),然后再推进到下一个await表达式那里(这跟生成器函数的运作方式有点像,那种函数也是一遇到yield就暂停)。
Python系统可以让数量极多的async函数各自向前推进,看起来像很多条Python线程那样,能够并发地运行。然而,这些协程并不会像线程那样占用大量内存,启动和切换的开销也比较小,而且不需要用复杂的代码来实现加锁或同步。这种强大的机制是通过事件循环(event loop)打造的,只要把相关的函数写对,这种循环就可以穿插着执行许多个这样的函数,并且执行得相当快,从而高效地完成并发式的I/O任务。
现在就用协程来实现生命游戏。我们的目标是让游戏能够高效地执行game_logic函数里面的I/O操作,同时又不像前面提到的Thread方案与Queue方案那样,有那么多缺点。首先修改game_logic函数,这次必须在定义函数所用的那个def关键字前面,加上async,表示该函数是一个协程,这样我们就可以在函数里面用await做I/O了(例如从套接字(socket)之中异步读取一份数据)。同理,给step_cell函数也添上async关键字,把它变为协程,并在调用game_logic的那个地方使用await关键字。 
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
# async def game_logic(state, neighbors):
# # Do some input/output in here:
# data = await my_socket.read(50)
async def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
async def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = await game_logic(state, neighbors)
set(y, x, next_state)
simulate函数也同样需要变为协程。
import asyncio
async def simulate(grid):
next_grid = Grid(grid.height, grid.width)
tasks = []
for y in range(grid.height):
for x in range(grid.width):
task = step_cell(
y, x, grid.get, next_grid.set) # Fan out
tasks.append(task)
await asyncio.gather(*tasks) # Fan in
return next_grid
async版本的simulate函数,有以下几个地方需要解释: ▪ 第一,它在调用step_cell的时候,系统并不会立刻执行这个函数,而是会返回一个协程实例,稍后会把这个实例写在await表达式里面。这里的step_cell,好比那种用yield写的生成器函数一样,调用时并不立刻执行它,而是返回一个生成器实例。这样就可以实现任务fan-out(分派)模式了。 ▪ 第二,内置的asyncio模块提供了gather函数,可以用来实现fan-in(归集)模式。把gather写在await表达式里面,可以让系统用事件循环去并发地执行那些step_cell协程,并在全部执行完之后,再往下推进simulate协程。 ▪ 第三,由于这些代码都是在同一条线程里执行的,因此不需要给Grid(网格)实例加锁,至于怎样让这些I/O操作表现出平行的效果,则是由asyncio所提供的事件循环来负责的。
最后,要调整原范例之中用来推动游戏流程的那段代码。我们只需要修改一行代码,也就是把simulate(grid)这个协程交给asyncio.run去运行,从而利用事件循环机制去执行推进单元格状态所需的那些I/O操作。
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
logging.getLogger().setLevel(logging.ERROR)
grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
for i in range(5):
columns.append(str(grid))
grid = asyncio.run(simulate(grid)) # Run the event loop
print(columns)
协程的优点是,能够把那些与外部环境交互的代码(例如I/O调用)与那些实现自身需求的代码(例如事件循环)解耦。这让我们可以把重点放在实现需求所用的逻辑上面,而不用专门花时间去写一些代码来确保这些需求能够并发地执行。
我们同样按之前的测一下性能。
# | 配置
性能(s) | 单线程 | 线程方案实现多线程 | 队列方案实现多线程 | ThreadPoolExecutor方案实现多线程 | 协程方式 | |————————————-|——————–|——————-|——————–|—————————|——————–| | Grid(500,900)
step 100次
线程数 5 | 55.45792198181152 | | 170.44810271263123 | 6410.6107659339905 | 382.2381818294525 | | Grid(50,90)
step 100次
线程数 5 | 0.6782510280609131 | 14.29249095916748 | 2.5422348976135254 | 5.175195217132568 | 2.2674009799957275 |