python并发与并行(八) ———— 用协程实现高并发的I/O

3 minute read

Published:

在前面几条里,我们以生命游戏为例,试着用各种方案解决I/O并行问题,这些方案在某些情况下确实可行,但如果同时需要执行的I/O任务有成千上万个,那么这些方案的效率就不太理想了

像这种在并发方面要求比较高的I/O需求,可以用Python的协程(coroutine)来解决。协程能够制造出一种效果,让我们觉得Python程序好像真的可以同时执行大量任务。这种效果需要使用async与await关键字来实现,它的基本原理与生成器(generator)类似,也就是不立刻给出所有的结果,而是等需要用到的时候再一项一项地获取

启动协程是有代价的,就是必须做一次函数调用。协程激活之后,只占用不到1KB内存,所以只要内存足够,协程稍微多一些也没关系。与线程类似,协程所要执行的任务也是用一个函数来表示的,在执行这个函数的过程中,协程可以从执行环境里面获取输入值,并把输出结果放到这个执行环境之中。协程与线程的区别在于,它不会把这个函数从头到尾执行完,而是每遇到一个await表达式,就暂停一次,下次继续执行的时候,它会先等待await所针对的那项awaitable操作有了结果(那项操作是用async函数表示的),然后再推进到下一个await表达式那里(这跟生成器函数的运作方式有点像,那种函数也是一遇到yield就暂停)。

Python系统可以让数量极多的async函数各自向前推进,看起来像很多条Python线程那样,能够并发地运行。然而,这些协程并不会像线程那样占用大量内存,启动和切换的开销也比较小,而且不需要用复杂的代码来实现加锁或同步。这种强大的机制是通过事件循环(event loop)打造的,只要把相关的函数写对,这种循环就可以穿插着执行许多个这样的函数,并且执行得相当快,从而高效地完成并发式的I/O任务。

现在就用协程来实现生命游戏。我们的目标是让游戏能够高效地执行game_logic函数里面的I/O操作,同时又不像前面提到的Thread方案与Queue方案那样,有那么多缺点。首先修改game_logic函数,这次必须在定义函数所用的那个def关键字前面,加上async,表示该函数是一个协程,这样我们就可以在函数里面用await做I/O了(例如从套接字(socket)之中异步读取一份数据)。同理,给step_cell函数也添上async关键字,把它变为协程,并在调用game_logic的那个地方使用await关键字。 

ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

# async def game_logic(state, neighbors):
#     # Do some input/output in here:
#     data = await my_socket.read(50)

async def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state


async def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = await game_logic(state, neighbors)
    set(y, x, next_state)

simulate函数也同样需要变为协程。

import asyncio

async def simulate(grid):
    next_grid = Grid(grid.height, grid.width)

    tasks = []
    for y in range(grid.height):
        for x in range(grid.width):
            task = step_cell(
                y, x, grid.get, next_grid.set)      # Fan out
            tasks.append(task)

    await asyncio.gather(*tasks)                    # Fan in

    return next_grid

async版本的simulate函数,有以下几个地方需要解释: ▪ 第一,它在调用step_cell的时候,系统并不会立刻执行这个函数,而是会返回一个协程实例,稍后会把这个实例写在await表达式里面。这里的step_cell,好比那种用yield写的生成器函数一样,调用时并不立刻执行它,而是返回一个生成器实例。这样就可以实现任务fan-out(分派)模式了。 ▪ 第二,内置的asyncio模块提供了gather函数,可以用来实现fan-in(归集)模式。把gather写在await表达式里面,可以让系统用事件循环去并发地执行那些step_cell协程,并在全部执行完之后,再往下推进simulate协程。 ▪ 第三,由于这些代码都是在同一条线程里执行的,因此不需要给Grid(网格)实例加锁,至于怎样让这些I/O操作表现出平行的效果,则是由asyncio所提供的事件循环来负责的。

最后,要调整原范例之中用来推动游戏流程的那段代码。我们只需要修改一行代码,也就是把simulate(grid)这个协程交给asyncio.run去运行,从而利用事件循环机制去执行推进单元格状态所需的那些I/O操作。

class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

logging.getLogger().setLevel(logging.ERROR)

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = asyncio.run(simulate(grid))   # Run the event loop

print(columns)

协程的优点是,能够把那些与外部环境交互的代码(例如I/O调用)与那些实现自身需求的代码(例如事件循环)解耦。这让我们可以把重点放在实现需求所用的逻辑上面,而不用专门花时间去写一些代码来确保这些需求能够并发地执行。

我们同样按之前的测一下性能。

# | 配置
性能(s) | 单线程 | 线程方案实现多线程 | 队列方案实现多线程 | ThreadPoolExecutor方案实现多线程 | 协程方式 | |————————————-|——————–|——————-|——————–|—————————|——————–| | Grid(500,900)
step 100次
线程数 5 | 55.45792198181152 | | 170.44810271263123 | 6410.6107659339905 | 382.2381818294525 | | Grid(50,90)
step 100次
线程数 5 | 0.6782510280609131 | 14.29249095916748 | 2.5422348976135254 | 5.175195217132568 | 2.2674009799957275 |