Python 进程、线程和协程实战指归

AirPython

共 10657字,需浏览 22分钟

 ·

2021-01-06 11:14

1. 前言

前些日子写过几篇关于线程和进程的文章,概要介绍了Python内置的线程模块(threading)和进程模块(multiprocessing)的使用方法,侧重点是线程间同步和进程间同步。随后,陆续收到了不少读者的私信,咨询进程、线程和协程的使用方法,进程、线程和协程分别适用于何种应用场景,以及混合使用进程、线程和协程的技巧。归纳起来,核心的问题大致有以下几个:
  • 使用线程是为了并行还是加速?

  • 为什么我使用多线程之后,处理速度并没有预期的快,甚至更慢了?

  • 我应该选择多进程处理还是多线程处理?

  • 协程和线程有什么不同?

  • 什么情况下使用协程?

在进程、线程和协程的使用上,初学者之所以感到困惑,最主要的原因是对任务的理解不到位。任务是由一个进程、或者线程、或者协程独立完成的、相对独立的一系列工作组合。通常,我们会把任务写成一个函数。任务有3种类型:
  • 计算密集型任务:任务包含大量计算,CPU占用率高

  • IO密集型任务:任务包含频繁的、持续的网络IO和磁盘IO

  • 混合型任务:既有计算也有IO

也有观点认为还有一种数据密集型任务,但我认为数据密集型任务一般出现在分布式系统或异构系统上,必定伴随着计算密集和IO密集,因此,仍然可以归类到混合型任务。
下面,我们就以几个实例来讲解演示进程、线程和协程的适用场景、使用方法,以及如何优化我们的代码。

2. 线程

2.1 线程的最大意义在于并行

通常,代码是单线程顺序执行的,这个线程就是主线程。仅有主线程的话,在同一时刻就只能做一件事情;如果有多件事情要做,那也只能做完一件再去做另一件。这有点类似于过去的说书艺人,情节人物复杂时,只能“花开两朵,各表一枝”。下面这个题目,就是一个需要同时做两件事情的例子。

请写一段代码,提示用户从键盘输入任意字符,然后等待用户输入。如果用户在10秒钟完成输入(按回车键),则显示输入内容并结束程序;否则,不再等待用户输入,而是直接提示超时并结束程序。

我们知道,input()函数用于从键盘接收输入,time.sleep()函数可以令程序停止运行指定的时长。不过,在等待键盘输入的时候,sleep()函数就无法计时,而在休眠的时候,input()函数就无法接收键盘输入。不借助于线程,我们无法同时做这两件事情。如果使用线程技术的话,我们可以在主线程中接收键盘输入,在子线程中启动sleep()函数,一旦休眠结束,子线程就杀掉主线程,结束程序。
import os, time
import threading

def monitor(pid):
    time.sleep(10)
    print('\n超时退出!')
    os._exit(0)

m = threading.Thread(target=monitor, args=(os.getpid(), ))
m.setDaemon(True)
m.start()

s = input('请输入>>>')
print('接收到键盘输入:%s'%s)
print('程序正常结束。')

2.2 使用线程处理IO密集型任务

假如从100个网站抓取数据,使用单线程的话,就需要逐一请求这100个站点并处理应答结果,所花费时间就是每个站点花费时间的总和。如果使用多个线程来实现的话,结果会怎样呢?
import time
import requests
import threading

urls = ['https://www.baidu.com''https://cn.bing.com']

def get_html(n):
    for i in range(n):
        url = urls[i%2]
        resp = requests.get(url)
        #print(resp.ok, url)

t0 = time.time()
get_html(100# 请求100次
t1 = time.time()
print('1个线程请求100次,耗时%0.3f秒钟'%(t1-t0))

for n_thread in (2,5,10,20,50):
    t0 = time.time()
    ths = list()
    for i in range(n_thread):
        ths.append(threading.Thread(target=get_html, args=(100//n_thread,)))
        ths[-1].setDaemon(True)
        ths[-1].start()
    for i in range(n_thread):
        ths[i].join()
    t1 = time.time()
    print('%d个线程请求100次,耗时%0.3f秒钟'%(n_thread, t1-t0))
上面的代码用百度和必应两个网站来模拟100个站点,运行结果如下所示。单线程处理大约需要30秒钟。分别使用2、5、10个线程来处理的话,所耗时间与线程数量基本上保持反比关系。当线程数量继续增加20个时,速度不再有显著提升。若将线程数量增至50个,时间消耗反倒略有增加。

1个线程请求100次,耗时30.089秒钟
2个线程请求100次,耗时15.087秒钟
5个线程请求100次,耗时7.803秒钟
10个线程请求100次,耗时4.112秒钟
20个线程请求100次,耗时3.160秒钟
50个线程请求100次,耗时3.564秒钟

这个结果表明,对于IO密集型(本例仅测试网络IO,没有磁盘IO)的任务,适量的线程可以在一定程度上提高处理速度。随着线程数量的增加,速度的提升不再明显。

2.3 使用线程处理计算密集型任务

对于曝光不足或明暗变化剧烈的照片可以通过算法来修正。下图左是一张落日图,因为太阳光线较强导致暗区细节无法辨识,通过低端增强算法可以还原为下图右的样子。

低端增强算法(也有人叫做伽马矫正)其实很简单:对于[0,255]区间内的灰度值v0,指定矫正系数y,使用下面的公式,即可达到矫正后的灰度值v1,其中y一般选择2或者3,上图右就是y为3的效果。

下面的代码,对于一张分辨率为4088x2752的照片实施低端增强算法,这是一项计算密集型的任务。代码中分别使用了广播和矢量计算、单线程逐像素计算、多线程逐像素计算等三种方法,以验证多线程对于计算密集型任务是否有提速效果。
import time
import cv2
import numpy as np
import threading

def gamma_adjust_np(im, gamma, out_file):
    """伽马增强函数:使用广播和矢量化计算"""

    out = (np.power(im.astype(np.float32)/2551/gamma)*255).astype(np.uint8)
    cv2.imwrite(out_file, out)

def gamma_adjust_py(im, gamma, out_file):
    """伽马增强函数:使用循环逐像素计算"""

    rows, cols = im.shape
    out = im.astype(np.float32)
    for i in range(rows):
        for j in range(cols):
            out[i,j] = pow(out[i,j]/2551/3)*255
    cv2.imwrite(out_file, out.astype(np.uint8))

im = cv2.imread('river.jpg', cv2.IMREAD_GRAYSCALE)
rows, cols = im.shape
print('照片分辨率为%dx%d'%(cols, rows))

t0 = time.time()
gamma_adjust_np(im, 3'river_3.jpg')
t1 = time.time()
print('借助NumPy广播特性,耗时%0.3f秒钟'%(t1-t0))

t0 = time.time()
im_3 = gamma_adjust_py(im, 3'river_3_cycle.jpg')
t1 = time.time()
print('单线程逐像素处理,耗时%0.3f秒钟'%(t1-t0))

t0 = time.time()
th_1 = threading.Thread(target=gamma_adjust_py, args=(im[:rows//2], 3'river_3_1.jpg'))
th_1.setDaemon(True)
th_1.start()
th_2 = threading.Thread(target=gamma_adjust_py, args=(im[rows//2:], 3'river_3_2.jpg'))
th_2.setDaemon(True)
th_2.start()
th_1.join()
th_2.join()
t1 = time.time()
print('启用两个线程逐像素处理,耗时%0.3f秒钟'%(t1-t0))

运行结果如下:

照片分辨率为4088x2752
借助NumPy广播特性,耗时0.381秒钟
单线程逐像素处理,耗时34.228秒钟
启用两个线程逐像素处理,耗时36.087秒钟

结果显示,对一张千万级像素的照片做低端增强,借助于NumPy的广播和矢量化计算,耗时0.38秒钟;单线程逐像素处理的话,耗时相当于NumPy的100倍;启用多线程的话,速度不仅没有加快,反倒是比单线程更慢。这说明,对于计算密集型的任务来说,多线程并不能提高处理速度,相反,因为要创建和管理线程,处理速度会更慢一些。

2.4 线程池

尽管多线程可以并行处理多个任务,但开启线程不仅花费时间,也需要占用系统资源。因此,线程数量不是越多越快,而是要保持在合理的水平上。线程池可以让我们用固定数量的线程完成比线程数量多得多的任务。下面的代码演示了使用 Python 的标准模块创建线程池,计算多个数值的平方。
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
        return x*x

>>> with ThreadPoolExecutor(max_workers=4as pool: # 4个线程的线程池
        result = pool.map(pow2, range(10)) # 使用4个线程分别计算0~9的平方

>>> list(result) # result是一个生成器,转成列表才可以直观地看到计算结果
[0149162536496481]
如果每个线程的任务各不相同,使用不同的线程函数,任务结束后的结果处理也不一样,同样可以使用这个线程池。下面的代码对多个数值中的奇数做平方运算,偶数做立方运算,线程任务结束后,打印各自的计算结果。
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
        return x*x

>>> def pow3(x):
        return x*x*x

>>> def save_result(task): # 保存线程计算结果
        global result
        result.append(task.result())

>>> result = list()
>>> with ThreadPoolExecutor(max_workers=3as pool:
        for i in range(10):
            if i%2# 奇数做平方运算
                task = pool.submit(pow2, i)
            else# 偶数做立方运算
                task = pool.submit(pow3, i)
            task.add_done_callback(save_result) # 为每个线程指定结束后的回调函数

>>> result
[018964252164951281]

3. 进程

3.1 使用进程处理计算密集型任务

和线程相比,进程的最大优势是可以充分例用计算资源——这一点不难理解,因为不同的进程可以运行在不同CPU的不同的核上。假如一台计算机的CPU共有16核,则可以启动16个或更多个进程来并行处理任务。对于上面的例子,我们改用进程来处理,效果会怎样呢?
import time
import cv2
import numpy as np
import multiprocessing as mp

def gamma_adjust_py(im, gamma, out_file):
    """伽马增强函数:使用循环逐像素计算"""

    rows, cols = im.shape
    out = im.astype(np.float32)
    for i in range(rows):
        for j in range(cols):
            out[i,j] = pow(out[i,j]/2551/3)*255
    cv2.imwrite(out_file, out.astype(np.uint8))

if __name__ == '__main__':
    mp.freeze_support()

    im_fn = 'river.jpg'
    im = cv2.imread(im_fn, cv2.IMREAD_GRAYSCALE)
    rows, cols = im.shape
    print('照片分辨率为%dx%d'%(cols, rows))

    t0 = time.time()
    pro_1 = mp.Process(target=gamma_adjust_py, args=(im[:rows//2], 3'river_3_1.jpg'))
    pro_1.daemon = True
    pro_1.start()
    pro_2 = mp.Process(target=gamma_adjust_py, args=(im[rows//2:], 3'river_3_2.jpg'))
    pro_2.daemon = True
    pro_2.start()
    pro_1.join()
    pro_2.join()
    t1 = time.time()
    print('启用两个进程逐像素处理,耗时%0.3f秒钟'%(t1-t0))
运行结果如下:

照片分辨率为4088x2752
启用两个进程逐像素处理,耗时17.786秒钟

使用单个线程或两个线程的时候,耗时大约30+秒,改用两个进程后,耗时17.786秒,差不多快了一倍。如果使用4个进程(前提是运行代码的计算机至少有4个CPU核)的话,速度还能提高一倍,有兴趣的朋友可以试一下。这个测试表明,对于计算密集型的任务,使用多进程并行处理是有效的提速手段。通常,进程数量选择CPU核数的整倍数。

3.2 进程间通信示例

多进程并行弥补了多线程技术的不足,我们可以在每一颗 CPU 上,或多核 CPU 的每一个核上启动一个进程。如果有必要,还可以在每个进程内再创建适量的线程,最大限度地使用计算资源来解决问题。不过,进程技术也有很大的局限性,因为进程不在同一块内存区域内,所以和线程相比,进程间的资源共享、通信、同步等都要麻烦得多,受到的限制也更多。
我们知道,线程间通信可以使用队列、互斥锁、信号量、事件和条件等多种同步方式,同样的,这些手段也可以应用在进程间。此外,multiprocessing 模块还提供了管道和共享内存等进程间通信的手段。下面仅演示一个进程间使用队列通信,更多的通信方式请参考由人民邮电出版社出版的拙著《Python高手修炼之道》。
这段代码演示了典型的生产者—消费者模式。进程 A 负责随机地往地上“撒钱”(写队列),进程 B 负责从地上“捡钱”(读队列)。
import os, time, random
import multiprocessing as mp

def sub_process_A(q):
    """A进程函数:生成数据"""

    while True:
        time.sleep(5*random.random()) # 在0-5秒之间随机延时
        q.put(random.randint(10,100)) # 随机生成[10,100]之间的整数

def sub_process_B(q):
    """B进程函数:使用数据"""

    words = ['哈哈,''天哪!''My God!''咦,天上掉馅饼了?']
    while True:
        print('%s捡到了%d块钱!'%(words[random.randint(0,3)], q.get()))

if __name__ == '__main__':
    print('主进程(%s)开始,按回车键结束本程序'%os.getpid())

    q = mp.Queue(10)

    p_a = mp.Process(target=sub_process_A, args=(q,))
    p_a.daemon = True
    p_a.start()

    p_b = mp.Process(target=sub_process_B, args=(q,))
    p_b.daemon = True
    p_b.start()

    input()

3.3 进程池

使用多进程并行处理任务时,处理效率和进程数量并不总是成正比。当进程数量超过一定限度后,完成任务所需时间反而会延长。进程池提供了一个保持合理进程数量的方案,但合理进程数量需要根据硬件状况及运行状况来确定,通常设置为 CPU 的核数。
multiprocessing.Pool(n) 可创建 n 个进程的进程池供用户调用。如果进程池任务不满,则新的进程请求会被立即执行;如果进程池任务已满,则新的请求将等待至有可用进程时才被执行。向进程池提交任务有以下两种方式。
  • apply_async(func[, args[, kwds[, callback]]]) :非阻塞式提交。即使进程池已满,也会接
    受新的任务,不会阻塞主进程。新任务将处于等待状态。

  • apply(func[, args[, kwds]]) :阻塞式提交。若进程池已满,则主进程阻塞,直至有空闲
    进程可以使用。

下面的代码演示了进程池的典型用法。读者可自行尝试阻塞式提交和非阻塞式提交两种方法的差异。
import time
import multiprocessing as mp

def power(x, a=2):
    """进程函数:幂函数"""

    time.sleep(1)
    print('%d的%d次方等于%d'%(x, a, pow(x, a)))

def demo():
    mpp = mp.Pool(processes=4)

    for item in [2,3,4,5,6,7,8,9]:
        mpp.apply_async(power, (item, )) # 非阻塞提交新任务
        #mpp.apply(power, (item, )) # 阻塞提交新任务

    mpp.close() # 关闭进程池,意味着不再接受新的任务
    print('主进程走到这里,正在等待子进程结束')
    mpp.join() # 等待所有子进程结束
    print('程序结束')

if __name__ == '__main__':
    demo()

4. 协程

4.1 协程和线程的区别

如前文所述,线程常用于多任务并行。对于可以切分的IO密集型任务,将切分的每一小块任务分配给一个线程,可以显著提高处理速度。而协程,无论有多少个,都被限定在一个线程内执行,因此,协程又被称为微线程。
从宏观上看,线程任务和协程任务都是并行的。从微观上看,线程任务是分时切片轮流执行的,这种切换是系统自动完成的,无需程序员干预;而协程则是根据任务特点,在任务阻塞时将控制权交给其他协程,这个权力交接的时机和位置,由程序员指定。由此可以看出,参与协程管理的每一个任务,必须存在阻塞的可能,且阻塞条件会被其它任务破坏,从而得以在阻塞解除后继续执行。
尽管协程难以驾驭,但是由于是在一个线程内运行,免除了线程或进程的切换开销,因而协程的运行效率高,在特定场合下仍然被广泛使用。

4.2 协程演进史

Py2时代,Python并不支持协程,仅可通过yield实现部分的协程功能。另外,还可以通过gevent等第三方库实现协程。gevent最好玩的,莫过于monkey_patch(猴子补丁),曾经有一段时间,我特别喜欢使用它。
从Py3.4开始,Python内置asyncio标准库,正式原生支持协程。asyncio的异步操作,需要在协程中通过yield from完成。协程函数则需要使用@asyncio.coroutine装饰器。
不理解生成器的同学,很难驾驭yield这个反人类思维的东西,为了更贴近人类思维,Py3.5引入了新的语法async和await,可以让协程的代码稍微易懂一点点。如果此前没有接触过协程,我建议你只学习async和await的用法就足够了,不需要去了解早期的yield和后来的yield from。本质上,async就是@asyncio.coroutine,await就是yield from,换个马甲,看起来就顺眼多了。

4.3 协程应用示例

作为基础知识,在介绍协程应用示例前,先来介绍一下队列。在进程、线程、协程模块中,都有队列(Queue)对象。队列作为进程、线程、协程间最常用的通信方式,有一个不容忽视的特性:阻塞式读和写。当队列为空时,读会被阻塞,直到读出数据;当队列满时,写会被阻塞,直到队列空出位置后写入成功。因为队列具有阻塞式读写的特点,正好可以在协程中利用阻塞切换其他协程任务。
我们来构思一个这样的应用:某个富豪(rich)手拿一沓钞票,随机取出几张,撒在地上(如果地上已经有钞票的话,就等有人捡走了再撒);另有名为A、B、C的三个幸运儿(lucky),紧盯着撒钱的富豪,只要富豪把钱撒到地上,他们立刻就去捡起来。
如果用协程实现上述功能的话,我们可以用长度为1的协程队列来存放富豪每一次抛撒的钱。一旦队列中有钱(队列满),富豪就不能继续抛撒了,抛撒被阻塞,协程控制权转移。三个幸运儿中的某一个获得控制权,就去读队列(捡钱),如果队列中没有钱(队列空),捡钱被阻塞,协程控制权转移。依靠队列的阻塞和解除阻塞,一个富豪和三个幸运儿可以顺利地分配完富豪手中的钞票。为了让这个过程可以慢到适合观察,可以在富豪抛钱之前,再增加一个随机延时。当然,这个延时不能使用time模块的sleep()函数,而是使用协程模块asyncio的sleep()函数。下面是完整的撒钱-捡钱代码。
import asyncio, random

async def rich(q, total):
    """任性的富豪,随机撒钱"""

    while total > 0:
        money = random.randint(10,100)
        total -= money
        await q.put(money) # 随机生成[10,100]之间的整数
        print('富豪潇洒地抛了%d块钱'%money)
        await asyncio.sleep(3*random.random()) # 在0-3秒之间随机延时

async def lucky(q, name):
    """随时可以捡到钱的幸运儿"""

    while True:
        money = await q.get()
        q.task_done()
        print('%s捡到了%d块钱!'%(name, money))

async def run():
    q = asyncio.Queue(1)

    producers = [asyncio.create_task(rich(q, 300))]
    consumers = [asyncio.create_task(lucky(q, name)) for name in 'ABC']

    await asyncio.gather(*producers,)
    await q.join()

    for c in consumers:
        c.cancel()

if __name__ == '__main__':
    asyncio.run(run())
运行结果如下:

富豪潇洒地抛了42块钱
A捡到了42块钱!
富豪潇洒地抛了97块钱
A捡到了97块钱!
富豪潇洒地抛了100块钱
B捡到了100块钱!
富豪潇洒地抛了35块钱
C捡到了35块钱!
富豪潇洒地抛了17块钱
A捡到了17块钱!
富豪抛完了手中的钱,转身离去




浏览 13
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报