Python 进程、线程和协程实战指南

共 10703字,需浏览 22分钟

 ·

2021-01-29 14:32

1. 前言

前些日子写过几篇关于线程和进程的文章,概要介绍了Python内置的线程模块(threading)和进程模块(multiprocessing)的使用方法,侧重点是线程间同步和进程间同步。随后,陆续收到了不少读者的私信,咨询进程、线程和协程的使用方法,进程、线程和协程分别适用于何种应用场景,以及混合使用进程、线程和协程的技巧。归纳起来,核心的问题大致有以下几个:
  • 使用线程是为了并行还是加速?

  • 为什么我使用多线程之后,处理速度并没有预期的快,甚至更慢了?

  • 我应该选择多进程处理还是多线程处理?

  • 协程和线程有什么不同?

  • 什么情况下使用协程?

在进程、线程和协程的使用上,初学者之所以感到困惑,最主要的原因是对任务的理解不到位。任务是由一个进程、或者线程、或者协程独立完成的、相对独立的一系列工作组合。通常,我们会把任务写成一个函数。任务有3种类型:
  • 计算密集型任务:任务包含大量计算,CPU占用率高

  • IO密集型任务:任务包含频繁的、持续的网络IO和磁盘IO

  • 混合型任务:既有计算也有IO

也有观点认为还有一种数据密集型任务,但我认为数据密集型任务一般出现在分布式系统或异构系统上,必定伴随着计算密集和IO密集,因此,仍然可以归类到混合型任务。
下面,我们就以几个实例来讲解演示进程、线程和协程的适用场景、使用方法,以及如何优化我们的代码。

2. 线程

2.1 线程的最大意义在于并行

通常,代码是单线程顺序执行的,这个线程就是主线程。仅有主线程的话,在同一时刻就只能做一件事情;如果有多件事情要做,那也只能做完一件再去做另一件。这有点类似于过去的说书艺人,情节人物复杂时,只能“花开两朵,各表一枝”。下面这个题目,就是一个需要同时做两件事情的例子。

请写一段代码,提示用户从键盘输入任意字符,然后等待用户输入。如果用户在10秒钟完成输入(按回车键),则显示输入内容并结束程序;否则,不再等待用户输入,而是直接提示超时并结束程序。

我们知道,input()函数用于从键盘接收输入,time.sleep()函数可以令程序停止运行指定的时长。不过,在等待键盘输入的时候,sleep()函数就无法计时,而在休眠的时候,input()函数就无法接收键盘输入。不借助于线程,我们无法同时做这两件事情。如果使用线程技术的话,我们可以在主线程中接收键盘输入,在子线程中启动sleep()函数,一旦休眠结束,子线程就杀掉主线程,结束程序。
import os, time
import threading

def monitor(pid):
    time.sleep(10)
    print('\n超时退出!')
    os._exit(0)

m = threading.Thread(target=monitor, args=(os.getpid(), ))
m.setDaemon(True)
m.start()

s = input('请输入>>>')
print('接收到键盘输入:%s'%s)
print('程序正常结束。')

2.2 使用线程处理IO密集型任务

假如从100个网站抓取数据,使用单线程的话,就需要逐一请求这100个站点并处理应答结果,所花费时间就是每个站点花费时间的总和。如果使用多个线程来实现的话,结果会怎样呢?
import time
import requests
import threading

urls = ['https://www.baidu.com''https://cn.bing.com']

def get_html(n):
    for i in range(n):
        url = urls[i%2]
        resp = requests.get(url)
        #print(resp.ok, url)

t0 = time.time()
get_html(100# 请求100次
t1 = time.time()
print('1个线程请求100次,耗时%0.3f秒钟'%(t1-t0))

for n_thread in (2,5,10,20,50):
    t0 = time.time()
    ths = list()
    for i in range(n_thread):
        ths.append(threading.Thread(target=get_html, args=(100//n_thread,)))
        ths[-1].setDaemon(True)
        ths[-1].start()
    for i in range(n_thread):
        ths[i].join()
    t1 = time.time()
    print('%d个线程请求100次,耗时%0.3f秒钟'%(n_thread, t1-t0))
上面的代码用百度和必应两个网站来模拟100个站点,运行结果如下所示。单线程处理大约需要30秒钟。分别使用2、5、10个线程来处理的话,所耗时间与线程数量基本上保持反比关系。当线程数量继续增加20个时,速度不再有显著提升。若将线程数量增至50个,时间消耗反倒略有增加。

1个线程请求100次,耗时30.089秒钟
2个线程请求100次,耗时15.087秒钟
5个线程请求100次,耗时7.803秒钟
10个线程请求100次,耗时4.112秒钟
20个线程请求100次,耗时3.160秒钟
50个线程请求100次,耗时3.564秒钟

这个结果表明,对于IO密集型(本例仅测试网络IO,没有磁盘IO)的任务,适量的线程可以在一定程度上提高处理速度。随着线程数量的增加,速度的提升不再明显。

2.3 使用线程处理计算密集型任务

对于曝光不足或明暗变化剧烈的照片可以通过算法来修正。下图左是一张落日图,因为太阳光线较强导致暗区细节无法辨识,通过低端增强算法可以还原为下图右的样子。

低端增强算法(也有人叫做伽马矫正)其实很简单:对于[0,255]区间内的灰度值v0,指定矫正系数y,使用下面的公式,即可达到矫正后的灰度值v1,其中y一般选择2或者3,上图右就是y为3的效果。

下面的代码,对于一张分辨率为4088x2752的照片实施低端增强算法,这是一项计算密集型的任务。代码中分别使用了广播和矢量计算、单线程逐像素计算、多线程逐像素计算等三种方法,以验证多线程对于计算密集型任务是否有提速效果。
import time
import cv2
import numpy as np
import threading

def gamma_adjust_np(im, gamma, out_file):
    """伽马增强函数:使用广播和矢量化计算"""

    out = (np.power(im.astype(np.float32)/2551/gamma)*255).astype(np.uint8)
    cv2.imwrite(out_file, out)

def gamma_adjust_py(im, gamma, out_file):
    """伽马增强函数:使用循环逐像素计算"""

    rows, cols = im.shape
    out = im.astype(np.float32)
    for i in range(rows):
        for j in range(cols):
            out[i,j] = pow(out[i,j]/2551/3)*255
    cv2.imwrite(out_file, out.astype(np.uint8))

im = cv2.imread('river.jpg', cv2.IMREAD_GRAYSCALE)
rows, cols = im.shape
print('照片分辨率为%dx%d'%(cols, rows))

t0 = time.time()
gamma_adjust_np(im, 3'river_3.jpg')
t1 = time.time()
print('借助NumPy广播特性,耗时%0.3f秒钟'%(t1-t0))

t0 = time.time()
im_3 = gamma_adjust_py(im, 3'river_3_cycle.jpg')
t1 = time.time()
print('单线程逐像素处理,耗时%0.3f秒钟'%(t1-t0))

t0 = time.time()
th_1 = threading.Thread(target=gamma_adjust_py, args=(im[:rows//2], 3'river_3_1.jpg'))
th_1.setDaemon(True)
th_1.start()
th_2 = threading.Thread(target=gamma_adjust_py, args=(im[rows//2:], 3'river_3_2.jpg'))
th_2.setDaemon(True)
th_2.start()
th_1.join()
th_2.join()
t1 = time.time()
print('启用两个线程逐像素处理,耗时%0.3f秒钟'%(t1-t0))

运行结果如下:

照片分辨率为4088x2752
借助NumPy广播特性,耗时0.381秒钟
单线程逐像素处理,耗时34.228秒钟
启用两个线程逐像素处理,耗时36.087秒钟

结果显示,对一张千万级像素的照片做低端增强,借助于NumPy的广播和矢量化计算,耗时0.38秒钟;单线程逐像素处理的话,耗时相当于NumPy的100倍;启用多线程的话,速度不仅没有加快,反倒是比单线程更慢。这说明,对于计算密集型的任务来说,多线程并不能提高处理速度,相反,因为要创建和管理线程,处理速度会更慢一些。

2.4 线程池

尽管多线程可以并行处理多个任务,但开启线程不仅花费时间,也需要占用系统资源。因此,线程数量不是越多越快,而是要保持在合理的水平上。线程池可以让我们用固定数量的线程完成比线程数量多得多的任务。下面的代码演示了使用 Python 的标准模块创建线程池,计算多个数值的平方。
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
        return x*x

>>> with ThreadPoolExecutor(max_workers=4as pool: # 4个线程的线程池
        result = pool.map(pow2, range(10)) # 使用4个线程分别计算0~9的平方

>>> list(result) # result是一个生成器,转成列表才可以直观地看到计算结果
[0149162536496481]
如果每个线程的任务各不相同,使用不同的线程函数,任务结束后的结果处理也不一样,同样可以使用这个线程池。下面的代码对多个数值中的奇数做平方运算,偶数做立方运算,线程任务结束后,打印各自的计算结果。
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
        return x*x

>>> def pow3(x):
        return x*x*x

>>> def save_result(task): # 保存线程计算结果
        global result
        result.append(task.result())

>>> result = list()
>>> with ThreadPoolExecutor(max_workers=3as pool:
        for i in range(10):
            if i%2# 奇数做平方运算
                task = pool.submit(pow2, i)
            else# 偶数做立方运算
                task = pool.submit(pow3, i)
            task.add_done_callback(save_result) # 为每个线程指定结束后的回调函数

>>> result
[018964252164951281]

3. 进程

3.1 使用进程处理计算密集型任务

和线程相比,进程的最大优势是可以充分例用计算资源——这一点不难理解,因为不同的进程可以运行在不同CPU的不同的核上。假如一台计算机的CPU共有16核,则可以启动16个或更多个进程来并行处理任务。对于上面的例子,我们改用进程来处理,效果会怎样呢?
import time
import cv2
import numpy as np
import multiprocessing as mp

def gamma_adjust_py(im, gamma, out_file):
    """伽马增强函数:使用循环逐像素计算"""

    rows, cols = im.shape
    out = im.astype(np.float32)
    for i in range(rows):
        for j in range(cols):
            out[i,j] = pow(out[i,j]/2551/3)*255
    cv2.imwrite(out_file, out.astype(np.uint8))

if __name__ == '__main__':
    mp.freeze_support()

    im_fn = 'river.jpg'
    im = cv2.imread(im_fn, cv2.IMREAD_GRAYSCALE)
    rows, cols = im.shape
    print('照片分辨率为%dx%d'%(cols, rows))

    t0 = time.time()
    pro_1 = mp.Process(target=gamma_adjust_py, args=(im[:rows//2], 3'river_3_1.jpg'))
    pro_1.daemon = True
    pro_1.start()
    pro_2 = mp.Process(target=gamma_adjust_py, args=(im[rows//2:], 3'river_3_2.jpg'))
    pro_2.daemon = True
    pro_2.start()
    pro_1.join()
    pro_2.join()
    t1 = time.time()
    print('启用两个进程逐像素处理,耗时%0.3f秒钟'%(t1-t0))
运行结果如下:

照片分辨率为4088x2752
启用两个进程逐像素处理,耗时17.786秒钟

使用单个线程或两个线程的时候,耗时大约30+秒,改用两个进程后,耗时17.786秒,差不多快了一倍。如果使用4个进程(前提是运行代码的计算机至少有4个CPU核)的话,速度还能提高一倍,有兴趣的朋友可以试一下。这个测试表明,对于计算密集型的任务,使用多进程并行处理是有效的提速手段。通常,进程数量选择CPU核数的整倍数。

3.2 进程间通信示例

多进程并行弥补了多线程技术的不足,我们可以在每一颗 CPU 上,或多核 CPU 的每一个核上启动一个进程。如果有必要,还可以在每个进程内再创建适量的线程,最大限度地使用计算资源来解决问题。不过,进程技术也有很大的局限性,因为进程不在同一块内存区域内,所以和线程相比,进程间的资源共享、通信、同步等都要麻烦得多,受到的限制也更多。
我们知道,线程间通信可以使用队列、互斥锁、信号量、事件和条件等多种同步方式,同样的,这些手段也可以应用在进程间。此外,multiprocessing 模块还提供了管道和共享内存等进程间通信的手段。下面仅演示一个进程间使用队列通信,更多的通信方式请参考由人民邮电出版社出版的拙著《Python高手修炼之道》。
这段代码演示了典型的生产者—消费者模式。进程 A 负责随机地往地上“撒钱”(写队列),进程 B 负责从地上“捡钱”(读队列)。
import os, time, random
import multiprocessing as mp

def sub_process_A(q):
    """A进程函数:生成数据"""

    while True:
        time.sleep(5*random.random()) # 在0-5秒之间随机延时
        q.put(random.randint(10,100)) # 随机生成[10,100]之间的整数

def sub_process_B(q):
    """B进程函数:使用数据"""

    words = ['哈哈,''天哪!''My God!''咦,天上掉馅饼了?']
    while True:
        print('%s捡到了%d块钱!'%(words[random.randint(0,3)], q.get()))

if __name__ == '__main__':
    print('主进程(%s)开始,按回车键结束本程序'%os.getpid())

    q = mp.Queue(10)

    p_a = mp.Process(target=sub_process_A, args=(q,))
    p_a.daemon = True
    p_a.start()

    p_b = mp.Process(target=sub_process_B, args=(q,))
    p_b.daemon = True
    p_b.start()

    input()

3.3 进程池

使用多进程并行处理任务时,处理效率和进程数量并不总是成正比。当进程数量超过一定限度后,完成任务所需时间反而会延长。进程池提供了一个保持合理进程数量的方案,但合理进程数量需要根据硬件状况及运行状况来确定,通常设置为 CPU 的核数。
multiprocessing.Pool(n) 可创建 n 个进程的进程池供用户调用。如果进程池任务不满,则新的进程请求会被立即执行;如果进程池任务已满,则新的请求将等待至有可用进程时才被执行。向进程池提交任务有以下两种方式。
  • apply_async(func[, args[, kwds[, callback]]]) :非阻塞式提交。即使进程池已满,也会接
    受新的任务,不会阻塞主进程。新任务将处于等待状态。

  • apply(func[, args[, kwds]]) :阻塞式提交。若进程池已满,则主进程阻塞,直至有空闲
    进程可以使用。

下面的代码演示了进程池的典型用法。读者可自行尝试阻塞式提交和非阻塞式提交两种方法的差异。
import time
import multiprocessing as mp

def power(x, a=2):
    """进程函数:幂函数"""

    time.sleep(1)
    print('%d的%d次方等于%d'%(x, a, pow(x, a)))

def demo():
    mpp = mp.Pool(processes=4)

    for item in [2,3,4,5,6,7,8,9]:
        mpp.apply_async(power, (item, )) # 非阻塞提交新任务
        #mpp.apply(power, (item, )) # 阻塞提交新任务

    mpp.close() # 关闭进程池,意味着不再接受新的任务
    print('主进程走到这里,正在等待子进程结束')
    mpp.join() # 等待所有子进程结束
    print('程序结束')

if __name__ == '__main__':
    demo()

4. 协程

4.1 协程和线程的区别

如前文所述,线程常用于多任务并行。对于可以切分的IO密集型任务,将切分的每一小块任务分配给一个线程,可以显著提高处理速度。而协程,无论有多少个,都被限定在一个线程内执行,因此,协程又被称为微线程。
从宏观上看,线程任务和协程任务都是并行的。从微观上看,线程任务是分时切片轮流执行的,这种切换是系统自动完成的,无需程序员干预;而协程则是根据任务特点,在任务阻塞时将控制权交给其他协程,这个权力交接的时机和位置,由程序员指定。由此可以看出,参与协程管理的每一个任务,必须存在阻塞的可能,且阻塞条件会被其它任务破坏,从而得以在阻塞解除后继续执行。
尽管协程难以驾驭,但是由于是在一个线程内运行,免除了线程或进程的切换开销,因而协程的运行效率高,在特定场合下仍然被广泛使用。

4.2 协程演进史

Py2时代,Python并不支持协程,仅可通过yield实现部分的协程功能。另外,还可以通过gevent等第三方库实现协程。gevent最好玩的,莫过于monkey_patch(猴子补丁),曾经有一段时间,我特别喜欢使用它。
从Py3.4开始,Python内置asyncio标准库,正式原生支持协程。asyncio的异步操作,需要在协程中通过yield from完成。协程函数则需要使用@asyncio.coroutine装饰器。
不理解生成器的同学,很难驾驭yield这个反人类思维的东西,为了更贴近人类思维,Py3.5引入了新的语法async和await,可以让协程的代码稍微易懂一点点。如果此前没有接触过协程,我建议你只学习async和await的用法就足够了,不需要去了解早期的yield和后来的yield from。本质上,async就是@asyncio.coroutine,await就是yield from,换个马甲,看起来就顺眼多了。

4.3 协程应用示例

作为基础知识,在介绍协程应用示例前,先来介绍一下队列。在进程、线程、协程模块中,都有队列(Queue)对象。队列作为进程、线程、协程间最常用的通信方式,有一个不容忽视的特性:阻塞式读和写。当队列为空时,读会被阻塞,直到读出数据;当队列满时,写会被阻塞,直到队列空出位置后写入成功。因为队列具有阻塞式读写的特点,正好可以在协程中利用阻塞切换其他协程任务。
我们来构思一个这样的应用:某个富豪(rich)手拿一沓钞票,随机取出几张,撒在地上(如果地上已经有钞票的话,就等有人捡走了再撒);另有名为A、B、C的三个幸运儿(lucky),紧盯着撒钱的富豪,只要富豪把钱撒到地上,他们立刻就去捡起来。
如果用协程实现上述功能的话,我们可以用长度为1的协程队列来存放富豪每一次抛撒的钱。一旦队列中有钱(队列满),富豪就不能继续抛撒了,抛撒被阻塞,协程控制权转移。三个幸运儿中的某一个获得控制权,就去读队列(捡钱),如果队列中没有钱(队列空),捡钱被阻塞,协程控制权转移。依靠队列的阻塞和解除阻塞,一个富豪和三个幸运儿可以顺利地分配完富豪手中的钞票。为了让这个过程可以慢到适合观察,可以在富豪抛钱之前,再增加一个随机延时。当然,这个延时不能使用time模块的sleep()函数,而是使用协程模块asyncio的sleep()函数。下面是完整的撒钱-捡钱代码。
import asyncio, random

async def rich(q, total):
    """任性的富豪,随机撒钱"""

    while total > 0:
        money = random.randint(10,100)
        total -= money
        await q.put(money) # 随机生成[10,100]之间的整数
        print('富豪潇洒地抛了%d块钱'%money)
        await asyncio.sleep(3*random.random()) # 在0-3秒之间随机延时

async def lucky(q, name):
    """随时可以捡到钱的幸运儿"""

    while True:
        money = await q.get()
        q.task_done()
        print('%s捡到了%d块钱!'%(name, money))

async def run():
    q = asyncio.Queue(1)

    producers = [asyncio.create_task(rich(q, 300))]
    consumers = [asyncio.create_task(lucky(q, name)) for name in 'ABC']

    await asyncio.gather(*producers,)
    await q.join()

    for c in consumers:
        c.cancel()

if __name__ == '__main__':
    asyncio.run(run())
运行结果如下:

富豪潇洒地抛了42块钱
A捡到了42块钱!
富豪潇洒地抛了97块钱
A捡到了97块钱!
富豪潇洒地抛了100块钱
B捡到了100块钱!
富豪潇洒地抛了35块钱
C捡到了35块钱!
富豪潇洒地抛了17块钱
A捡到了17块钱!
富豪抛完了手中的钱,转身离去

推荐阅读

全面拥抱FastApi — 蓝图APIRouter


Python协程与异步编程超全总结



浏览 45
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报