python并发与并行(四) ———— 用queue来协调多个线程之间的工作进度

9 minute read

Published:

Python程序如果要同时执行多项任务,而这些任务又分别针对同一种产品的不同环节,那么就有可能得在它们之间进行协调。比较有用的一种协调方式是把函数拼接成管道。

这样的管道与生产线比较像。它可以按先后顺序划分成几个阶段,每个阶段都由相应的函数负责。程序会把未经加工的原料放在生产线(也就是管道)的起点,而那些函数,则分别关注着自己所负责的这一段,只要有产品来到当前环节,它就对这件产品做出相应的加工处理。如果所有函数都不会再收到有待加工的产品,那么整条生产线就可以关停。这套方案,很适合应对与阻塞式I/O或子进程有关的需求,因为我们很容易就能在Python程序里,平行地开启多个线程来分别负责生产线中的某个环节。

例如,要构建这样一套系统,让它持续从数码相机里获取照片,然后调整照片尺寸,最后把调整好的照片添加到网络相册之中。如果用管道来实现,那么这个管道就有三个环节。第一个环节是,从数码相机里下载新图像;第二个环节是,调整这些图像的尺寸;第三个环节是,把尺寸已经调整好的图像上传到网络相册里面。 假如这三个环节所对应的download、resize与upload函数,如下面的示例程序所示。

def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

首先,必须想办法表示每个环节所要加工的产品,并让加工好的产品能够为下一个环节所获取。这可以用线程安全的生产-消费队列(producer-consumer queue,也叫生产者-消费队列)来实现。

from collections import deque
from threading import Lock

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

内容扩展


deque 是 Python 标准库 collections 中的一个双端队列(double-ended queue)数据结构。它支持从队列的两端高效地添加和弹出元素,这使得它在某些场景下比常规的 Python 列表更加高效。以下是 deque 的一些主要用法和特性:

  • 初始化:你可以使用一个可迭代对象(如列表)来初始化 deque。
    from collections import deque  
    d = deque([1, 2, 3])
    
  • 添加元素:使用 append() 和 appendleft() 方法可以在队列的右端和左端添加元素。
    d.append(4)       # 在右端添加元素  
    d.appendleft(0)   # 在左端添加元素
    
  • 弹出元素:使用 pop() 和 popleft() 方法可以从队列的右端和左端移除并返回元素。
    right_element = d.pop()    # 从右端弹出元素  
    left_element = d.popleft() # 从左端弹出元素
    
  • 查看元素但不移除:可以使用 right 或 left 索引来查看但不移除队列两端的元素。或者使用 peekleft() 和 peekright()(在 Python 3.10.0 之后的版本中已弃用)方法。
    right_element = d[-1]  # 查看右端元素  
    left_element = d[0]    # 查看左端元素
    
  • 旋转:deque 提供了一个 rotate() 方法,它可以将 deque 中的元素循环移位。正数表示向右旋转,负数表示向左旋转。
    d.rotate(1)  # 向右旋转1个位置  
    d.rotate(-1) # 向左旋转1个位置
    
  • 设置最大长度:在创建 deque 时,你可以设置一个最大长度。当队列达到这个长度时,新添加的元素会导致另一端的元素被弹出。
    d = deque(maxlen=5)
    
  • 其他方法: count(value): 返回队列中值为 value 的元素个数。

index(value, [start, [stop]]): 返回队列中第一个值为 value 的元素的索引,如果没有找到则抛出异常。可选的 start 和 stop 参数用于限制搜索范围。

extend(iterable): 在队列的右端扩展多个元素。

extendleft(iterable): 在队列的左端扩展多个元素。

  • 性能特点:

在队列的两端添加或弹出元素的时间复杂度都是 O(1)。而在列表中进行相同的操作,其时间复杂度为 O(n)。因此,在处理需要频繁在两端添加或移除元素的场景时,deque 比列表更加高效。

  • 内存效率:

相比于列表,deque 在内存使用上也更加高效,因为它不需要像列表那样预留额外的空间以应对可能的扩展。当 deque 需要增长时,它会动态地分配所需的内存。

  • 线程安全:

deque虽然是线程安全的,但是deque的线程安全是由GIL的特性来提供的,在单线程的环境下是线程安全的,在多线程条件下,GIL的切换仍然会导致数据线程不安全,如果在多线程环境中使用,需要额外的同步机制来避免数据竞争。

总的来说,deque 是一个在处理需要频繁在两端添加或移除元素的场景时非常有用的数据结构。


首先定义put方法,让生产者(也就是数码相机)可以通过这个方法把新图像添加到deque的尾部。

def put(self, item):
    with self.lock:
        self.items.append(item)

然后定义get方法。get方法是第一阶段的消费者,也就是需要下载照片的函数,可以通过这个方法从deque的前端(即左侧)获取元素。

def get(self):
    with self.lock:
        return self.items.popleft()

我们把上面两个函数组合成一个类

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    def put(self, item):
        with self.lock:
            self.items.append(item)

    def get(self):
        with self.lock:
            return self.items.popleft()

我们把管道的每个阶段都表示成一条Python线程,它会从刚才那样的队列中取出有待处理的产品,并交给对应的函数去处理,接着再把处理结果放到下一个队列之中。另外,我们再添加两个字段,分别记录这条线程向上游队列查询产品的次数以及完成加工的次数。

from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

最难处理的地方在于,如果上游环节的速度比较慢,导致它不能及时把加工过的产品添加到本环节的输入队列里面,那么输入队列就有可能出现空白,使得当前环节暂时获取不到有待加工的产品。我们通过捕捉IndexError来处理这种上游发生延迟的情况。

from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)  # No work to do
            except AttributeError:
                # The magic exit signal
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

现在,创建四个队列,并在它们之间安排三条工作线程,让每条线程都从上游队列里面获取元素,并把加工过的元素放到下游队列之中。

download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

启动这些线程,然后给管道第一环节填入一大批原材料。在这里采用普通的object实例模拟download函数所要下载的真实数据。

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

反复查询最后那个队列(也就是done_queue)里的元素数量,如果这个数量与一开始的原材料数量相同,那就说明整条管道已经把所有产品全都加工好了。

while len(done_queue.items) < 1000:
    # Do something useful while waiting
    time.sleep(0.1)
# Stop all the threads by causing an exception in their
# run methods.
for thread in threads:
    thread.in_queue = None
    thread.join()

processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f'Processed {processed} items after '
      f'polling {polled} times')

完整代码如下:


def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item


from collections import deque
from threading import Lock

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    def put(self, item):
        with self.lock:
            self.items.append(item)

    def get(self):
        with self.lock:
            return self.items.popleft()


from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0


    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                print('Worker class, in_queue.get IndexError')
                time.sleep(0.01)  # No work to do
            except AttributeError:
                # The magic exit signal
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1


download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]


for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())


while len(done_queue.items) < 1000:
    # Do something useful while waiting
    time.sleep(0.1)
# Stop all the threads by causing an exception in their
# run methods.
for thread in threads:
    thread.in_queue = None
    thread.join()


processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f'Processed {processed} items after '
      f'polling {polled} times')

Output:

Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Worker class, in_queue.get IndexError
Processed 1000 items after polling 3033 times

我们可以看到在流水线工作中,查询的总次数是大于要处理的总数量的。有33次是消费快于生产的,所以出现了索引失败的情况。由于每个环节的加工速度可能不太一样,因此下游环节或许会遇到暂时没有产品可以加工的情况,这会拖延管道的总进度。下游环节必须频繁查询上游队列,才能尽快发现并获取到自己所要加工的产品。这相当于是在浪费CPU资源,因为在这种情况下,这些线程只是在触发并捕获IndexError,而没有做实际的工作。

我们再总结一下这个代码实现的功能:

这个代码实现了一个简单的多线程处理管道,用于模拟下载、调整大小和上传三个处理步骤。以下是该代码的详细解释:

函数定义: download(item), resize(item), 和 upload(item) 函数在这里都是简单的占位函数,它们仅仅返回传入的 item。在实际应用中,这些函数会执行相应的下载、调整图片大小和上传操作。

队列类: MyQueue 是一个线程安全的队列类,使用 collections.deque 作为其内部数据结构,并通过 threading.Lock 确保多个线程同时访问和修改队列时的线程安全。 put 方法用于向队列添加元素,而 get 方法用于从队列中取出元素。

工作线程类: Worker 类继承自 Thread,表示一个工作线程。 每个 Worker 都有一个输入队列 in_queue 和一个输出队列 out_queue。它不断地从 in_queue 中取出项目,使用其特定的函数(如 download、resize、upload)处理这些项目,并将处理后的结果放入 out_queue。 polled_count 用于记录线程尝试从输入队列中获取项目的次数,而 work_done 记录已完成的工作量。

主程序流程: 初始化三个工作队列:download_queue、resize_queue 和 upload_queue,以及一个 done_queue 用于存储所有处理完成的项目。 创建三个 Worker 线程,分别负责下载、调整大小和上传任务。 启动这三个线程。 向 download_queue 中放入1000个对象作为处理任务。 主线程等待,直到 done_queue 中的项目数量达到1000,这意味着所有的项目都已经被处理完毕。 为了优雅地停止所有线程,主线程将每个 Worker 的 in_queue 设置为 None,这会导致 Worker 在尝试从队列中获取项目时引发 AttributeError 异常,从而退出其运行循环。 最后,程序打印出处理的项目数量和总的轮询次数。

总的来说,这个代码模拟了一个多线程的图片处理管道,其中包括下载、调整大小和上传三个步骤。每个步骤都由一个单独的线程处理,并通过线程安全的队列进行通信。

下面说一下这个程序的缺点:

第一,为了判断全部产品是否加工完毕,必须像Worker线程里的run方法那样,反复查询最后那个队列,以确认里面的元素个数是否已经变得与刚开始的原料总数相同。

第二,目前这种方案会使run方法陷入无限循环,我们没办法明确通知线程何时应该退出。

第三,如果下游环节的处理速度过慢,那么程序随时都有可能崩溃,这是最严重的问题。例如,如果第一个环节处理得很快,而第二个环节处理得比较慢,那么连接这两个环节的那个队列就会迅速膨胀,因为它里面堆积了大量的产品等着第二个环节来加工,可是第二个环节又跟不上节奏。时间久了,数据会越积越多,导致程序因为耗尽内存而崩溃。

总之,这种需求不适合用管道来实现,因为很难构建出良好的生产-消费队列。

改用Queue来实现

内置的queue模块里有个Queue类,它提供了解决上述问题所需的所有功能。 改用Queue之后,就不用再频繁查询是否有新产品要加工了,因为它的get方法会一直阻塞在那里,直至有新数据返回为止。例如,我们可以启动这样一条消费线程,以等待队列里面出现新的输入数据:

from queue import Queue

my_queue = Queue()

def consumer():
    print('Consumer waiting')
    my_queue.get()              # Runs after put() below
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

即便这个线程先启动,也没有关系,因为只有当生产线程通过Queue实例的put方法给队列里面填入新数据之后,刚才那个get方法才有数据可以返回。

print('Producer putting')
my_queue.put(object())          # Runs before get() above
print('Producer done')
thread.join()

完整代码为:

from queue import Queue
from threading import Thread

my_queue = Queue()

def consumer():
    print('Consumer waiting')
    my_queue.get()              # Runs after put() below
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()


print('Producer putting')
my_queue.put(object())          # Runs before get() above
print('Producer done')
thread.join()

Output:

Consumer waiting
Producer putting
Producer done
Consumer done

可以看到Consumer done是最后被打印出来的。程序会在my_queue.put()运行后才运行my_queue.get()。

为了解决因下游环节速度过慢而造成的管道拥堵问题,我们可以限定Queue最多只能堆积多少个元素。如果通过put方法给已经填满的队列添加新元素,那么这个方法就会阻塞,直到队列里有空位为止。下面我们创建最多只能保存一个元素的队列,并且定义这样一条消费线程,让它先等待一段时间,然后再从队列中获取元素,这样就促使生产线程没办法立刻给队列中添加新元素。

my_queue = Queue(1)             # Buffer size of 1

def consumer():
    time.sleep(0.1)             # Wait
    my_queue.get()              # Runs second
    print('Consumer got 1',flush=True)
    my_queue.get()              # Runs fourth
    print('Consumer got 2',flush=True)
    print('Consumer done',flush=True)

thread = Thread(target=consumer)
thread.start()

my_queue.put(object())          # Runs first
print('Producer put 1',flush=True)
my_queue.put(object())          # Runs third
print('Producer put 2',flush=True)
print('Producer done',flush=True)
thread.join()

Output:

Producer put 1
Consumer got 1
Consumer got 2
Consumer done
Producer put 2
Producer done

但是这个程序多运行几次,会发现偶尔一次的运行结果不一样。这个与多线程执行的复杂性有关。

我们多运行几次,有的结果会是这样:

Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done
Consumer done

在这个代码中,time.sleep(0.1) 在 consumer 函数中起到的主要作用是引入一个短暂的延迟,模拟消费者线程在处理数据前的一些处理时间或者等待时间。 由于这段代码是在多线程环境中运行的,consumer 函数和主线程(生产者)会并发执行。如果没有 time.sleep(0.1) 这一行,消费者线程可能会立即尝试从队列中获取数据,但此时生产者可能还没有来得及放入任何数据,这会导致消费者线程被阻塞,直到有数据被放入队列。 通过加入 time.sleep(0.1),消费者线程会延迟0.1秒再开始从队列中获取数据。这个延迟给了生产者线程一些时间来将对象放入队列中。这样,当消费者线程开始从队列中获取数据时,生产者可能已经将一些数据放入了队列。

但由于线程执行的不确定性,这种保证不是绝对的。

Queue类还可以通过task_done方法告诉程序它已经把其中一个元素处理完了。这样的话,就不用像早前对待done_queue那样,反复查询生产线末端的那个队列了,因为我们可以通过配套的join方法确认这个队列中的所有元素都已经处理完毕。下面我们定义一个消费线程,令它在处理完一个元素之后,调用task_done方法。

in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()       # Done second
    print('Consumer working')
    # Doing work
    print('Consumer done')
    in_queue.task_done()        # Done third

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
in_queue.put(object())         # Done first
print('Producer waiting')
in_queue.join()                # Done fourth
print('Producer done')
thread.join()

Output:

Consumer waiting
Producer putting
Producer waiting
Consumer working
Consumer done
Producer done

in_queue.join()等待队列中的所有工作都被完成。这个方法会阻塞,直到队列中的所有任务都被标记为完成(通过in_queue.task_done())。

有了这个方法,就不用反复查询队列中的数据有没有处理完了,而是只需在Queue实例上面调用join就行。即便队列中的元素已经全部取走,只要task_done方法的执行次数不足,join就会卡住,直到早前加入队列的每个元素都调用一次task_done方法为止。

虽然这个示例很简单,但它展示了多线程编程中的一个基本模式:生产者-消费者模式。在这个模式中,生产者负责生成数据或任务,而消费者负责处理这些数据或任务。队列在这里起到了一个关键的作用,它允许生产者和消费者在不同的速度和时间上工作,而不需要彼此直接通信。

现在,我们把这些特性结合起来,构建这样一个Queue子类,这个子类还需要判断工作线程何时应该彻底结束任务。为了实现这项功能,我们定义close方法,让它把特殊的标志元素(sentinel item)放入队列,用以表示这个元素之后不会再有新的数据需要加工了。

class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

然后,为这种队列定义迭代器,令这个迭代器在发现标志元素之后,停止迭代。我们还需要在__iter__方法(参见第31条)里面适时地调用task_done,用以追踪队列的工作进度。

class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)


    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()

把ClosableQueue类写好之后,我们来重新定义工作线程。这次我们通过for循环来迭代队列,只要队列里的元素用尽,线程就可以退出[3]。

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

工作线程启动后,我们依然向第一阶段的输入队列填充原料,只不过这次,在填充完之后还需要调用close方法,向队列中加入特殊的终止信号。

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

download_queue.close()

然后,在队列上调用join方法,等待相应的线程把这个队列中的所有元素都取走。每处理完一个队列,就在下一个队列上调用close方法,向它推送停止信号,并等待相应的线程把那个队列里的元素也全部取走。最终,done_queue队列里包含的就是预期的所有输出成品。

download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

for thread in threads:
    thread.join()

完整代码为:

from queue import Queue
from threading import Thread
import time
from queue import Queue

def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item


class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)


    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()


class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]


for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

download_queue.close()


download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

for thread in threads:
    thread.join()

Output:

1000 items finished

这个方案还可以扩展,也就是用多个线程同时处理某一个环节,以提高I/O并行度,从而大幅提升程序效率。为了实现这种效果,定义两个辅助函数分别用来开启和关闭一组线程。其中,负责关闭线程的那个stop_threads函数要根据线程数量来相应地调用close方法,这样才能让每一条使用该队列的线程都能够查询到一个充当退出标志的特殊元素,从而正常地退出。

def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def stop_threads(closable_queue, threads):
    for _ in threads:
        closable_queue.close()

    closable_queue.join()

    for thread in threads:
        thread.join()

我们还是像原来那样,把队列与线程拼接起来,并向管道顶部填充原材料,然后依次关闭各队列以及使用该队列的那组线程,并打印最终结果。

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

download_threads = start_threads(
    3, download, download_queue, resize_queue)
resize_threads = start_threads(
    4, resize, resize_queue, upload_queue)
upload_threads = start_threads(
    5, upload, upload_queue, done_queue)

for _ in range(1000):
    download_queue.put(object())

stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)

print(done_queue.qsize(), 'items finished')

完整代码如下:

from queue import Queue
from threading import Thread
import time
from queue import Queue

def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item


class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)


    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()


class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def stop_threads(closable_queue, threads):
    for _ in threads:
        closable_queue.close()

    closable_queue.join()

    for thread in threads:
        thread.join()


download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

download_threads = start_threads(
    3, download, download_queue, resize_queue)
resize_threads = start_threads(
    4, resize, resize_queue, upload_queue)
upload_threads = start_threads(
    5, upload, upload_queue, done_queue)

for _ in range(1000):
    download_queue.put(object())

stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)

print(done_queue.qsize(), 'items finished')

管道非常适合用来安排多阶段的任务,让我们能够把每一阶段都交给各自的线程去执行,这尤其适合用在I/O密集型的程序里面。

▪ 构造这种并发的管道时,有很多问题需要注意,例如怎样防止线程频繁地查询队列状态,怎样通知线程尽快结束操作,以及怎样防止管道出现拥堵等。

▪ 我们可以利用Queue类所具有的功能来构造健壮的管道系统,因为这个类提供了阻塞式的入队(put)与出队(get)操作,而且可以限定缓冲区的大小,还能够通过task_done与join来确保所有元素都已处理完毕。