详解Python多线程、多进程
线程与进程的区别
 
   进程和线程区别
 
    线程与进程的区别可以归纳为以下4点:
-  
     
地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
 -  
     
通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
 -  
     
调度和切换:线程上下文切换比进程上下文切换要快得多。
 -  
     
在多线程OS中,进程不是一个可执行的实体。
 
多进程和多线程的比较
| 对比维度 | 多进程 | 多线程 | 总结 | 
|---|---|---|---|
| 数据共享、同步 | 数据共享复杂,同步简单 | 数据共享简单,同步复杂 | 各有优劣 | 
| 内存、CPU | 占用内存多,切换复杂,CPU利用率低 | 占用内存少,切换简单,CPU利用率高 | 线程占优 | 
| 创建、销毁、切换 | 复杂,速度慢 | 简单,速度快 | 线程占优 | 
| 编程、调试 | 编程简单,调试简单 | 编程复杂,调试复杂 | 进程占优 | 
| 可靠性 | 进程间不会互相影响 | 一个线程挂掉将导致整个进程挂掉 | 进程占优 | 
| 分布式 | 适用于多核、多机,扩展到多台机器简单 | 适合于多核 | 进程占优 | 
-  
     
线程在进程下行进(单纯的车厢无法运行)  -  
     
一个进程可以包含多个线程(一辆火车可以有多个车厢)  -  
     
不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)  -  
     
同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)  -  
     
进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)  -  
     
进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到该趟火车的所有车厢)  -  
     
进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)  -  
     
进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-”互斥锁(mutex)”  -  
     
进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量(semaphore)”  
Python全局解释器锁GIL
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.
 
    在多线程环境中,Python 虚拟机按以下方式执行:
-  
     
设置GIL  -  
     
切换到一个线程去运行  -  
     
运行直至指定数量的字节码指令,或者线程主动让出控制(可以调用sleep(0))  -  
     
把线程设置为睡眠状态  -  
     
解锁GIL  再次重复以上所有步骤
 
    -  
     
使用更高版本Python(对GIL机制进行了优化)  -  
     
使用多进程替换多线程(多进程之间没有GIL,但是进程本身的资源消耗较多)  -  
     
指定cpu运行线程(使用affinity模块)  -  
     
使用Jython、IronPython等无GIL解释器  -  
     
全IO密集型任务时才使用多线程  -  
     
使用协程(高效的单线程模式,也称微线程;通常与多进程配合使用)  -  
     
将关键组件用C/C++编写为Python扩展,通过ctypes使Python程序直接调用C语言编译的动态链接库的导出函数。(with nogil调出GIL限制)  
Python的多进程包multiprocessing
Multiprocessing产生的背景
import os
print('Process (%s) start...' % os.getpid())
\# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
 
   上述代码在Linux、Unix和Mac上的执行结果为:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
 
   multiprocessing常用组件及功能
 
    创建管理进程模块:
-  
     
Process(用于创建进程)  -  
     
Pool(用于创建管理进程池)  -  
     
Queue(用于进程通信,资源共享)  -  
     
Value,Array(用于进程通信,资源共享)  -  
     
Pipe(用于管道通信)  -  
     
Manager(用于资源共享)  
同步子进程模块:
-  
     
Condition(条件变量)  -  
     
Event(事件)  -  
     
Lock(互斥锁)  -  
     
RLock(可重入的互斥锁(同一个进程可以多次获得它,同时不会造成阻塞)  -  
     
Semaphore(信号量)  
接下来就一起来学习下每个组件及功能的具体使用方法。
Process(用于创建进程)
multiprocessing模块提供了一个Process类来代表一个进程对象。
在multiprocessing中,每一个进程都用一个Process类来表示。
构造方法:Process([group [, target [, name [, args [, kwargs]]]]])
-  
     
group:分组,实际上不使用,值始终为None  -  
     
target:表示调用对象,即子进程要执行的任务,你可以传入方法名  -  
     
name:为子进程设定名称  -  
     
args:要传给target函数的位置参数,以元组方式进行传入。  -  
     
kwargs:要传给target函数的字典参数,以字典方式进行传入。  
实例方法:
-  
     
start():启动进程,并调用该子进程中的p.run()  -  
     
run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  -  
     
terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁  -  
     
is_alive():返回进程是否在运行。如果p仍然运行,返回True  -  
     
join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  
属性介绍:
-  
     
daemon:默认值为False,如果设为True,代表p为后台运行的守护进程;当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程;必须在p.start()之前设置  -  
     
name:进程的名称  -  
     
pid:进程的pid  -  
     
exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)  -  
     
authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)  
使用示例:(注意:在windows中Process()必须放到if name == ‘main’:下)
from multiprocessing import Process
import os
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
print('Child process end.')
 
   Pool(用于创建管理进程池)
 
    -  
     
processes :要创建的进程数,如果省略,将默认使用cpu_count()返回的数量。  -  
     
initializer:每个工作进程启动时要执行的可调用对象,默认为None。如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。  -  
     
initargs:是要传给initializer的参数组。  -  
     
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。  -  
     
context: 用在制定工作进程启动时的上下文,一般使用Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。  
-  
     
apply(func[, args[, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()。它是阻塞的。apply很少使用  -  
     
apply_async(func[, arg[, kwds={}[, callback=None]]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。它是非阻塞。  -  
     
map(func, iterable[, chunksize=None]):Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。  -  
     
map_async(func, iterable[, chunksize=None]):map_async与map的关系同apply与apply_async  -  
     
imap():imap 与 map的区别是,map是当所有的进程都已经执行完了,并将结果返回了,imap()则是立即返回一个iterable可迭代对象。  -  
     
imap_unordered():不保证返回的结果顺序与进程添加的顺序一致。  -  
     
close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。  -  
     
join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用,让其不再接受新的Process。  -  
     
terminate():结束工作进程,不再处理未处理的任务。  
-  
     
get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发。  -  
     
ready():如果调用完成,返回True  -  
     
successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常  -  
     
wait([timeout]):等待结果变为可用。  -  
     
terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数  
使用示例:
\# -*- coding:utf-8 -*-
\# Pool+map
from multiprocessing import Pool
def test(i):
    print(i)
if __name__ == "__main__":
    lists = range(100)
    pool = Pool(8)
    pool.map(test, lists)
    pool.close()
    pool.join()
 
   \# -*- coding:utf-8 -*-
\# 异步进程池(非阻塞)
from multiprocessing import Pool
def test(i):
    print(i)
if __name__ == "__main__":
    pool = Pool(8)
    for i in range(100):
        '''
        For循环中执行步骤:
        (1)循环遍历,将100个子进程添加到进程池(相对父进程会阻塞)
        (2)每次执行8个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)
        apply_async为异步进程池写法。异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。
        '''
        pool.apply_async(test, args=(i,))  # 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.
    print("test")
    pool.close()
    pool.join()
 
   \# -*- coding:utf-8 -*-
\# 异步进程池(非阻塞)
from multiprocessing import Pool
def test(i):
    print(i)
if __name__ == "__main__":
    pool = Pool(8)
    for i in range(100):
        '''
            实际测试发现,for循环内部执行步骤:
            (1)遍历100个可迭代对象,往进程池放一个子进程
            (2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)
            for循环执行完毕,再执行print函数。
        '''
        pool.apply(test, args=(i,))  # 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.
    print("test")
    pool.close()
    pool.join()
 
   Queue(用于进程通信,资源共享)
-  
     
maxsize是队列中允许最大项数,省略则无大小限制。  
-  
     
put():用以插入数据到队列。put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。  -  
     
get():可以从队列读取并且删除一个元素。get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。若不希望在empty的时候抛出异常,令blocked为True或者参数全部置空即可。  -  
     
get_nowait():同q.get(False)  -  
     
put_nowait():同q.put(False)  -  
     
empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。  -  
     
full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。  -  
     
qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样  
使用示例:
from multiprocessing import Process, Queue
import os, time, random
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()
    pr.start()
    pw.join()  # 等待pw结束
    pr.terminate()  # pr进程里是死循环,无法等待其结束,只能强行终止
 
   -  
     
maxsize:队列中允许最大项数,省略则无大小限制。  
-  
     
task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常  -  
     
join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止  
使用示例:
\# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
    while True:
        res = q.get()
        print('消费者拿到了 %s' % res)
        q.task_done()
def producer(seq, q):
    for item in seq:
        time.sleep(random.randrange(1,2))
        q.put(item)
        print('生产者做好了 %s' % item)
    q.join()
if __name__ == "__main__":
    q = JoinableQueue()
    seq = ('产品%s' % i for i in range(5))
    p = Process(target=consumer, args=(q,))
    p.daemon = True  # 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    p.start()
    producer(seq, q)
    print('主线程')
 
   Value,Array(用于进程通信,资源共享)
-  
     
typecode_or_type:定义ctypes()对象的类型,可以传Type code或 C Type,具体对照表见下文。  -  
     
args:传递给typecode_or_type构造函数的参数  -  
     
lock:默认为True,创建一个互斥锁来限制对Value对象的访问,如果传入一个锁,如Lock或RLock的实例,将用于同步。如果传入False,Value的实例就不会被锁保护,它将不是进程安全的。  
| Type code | C Type             | Python Type       | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'`     | signed char        | int               | 1                     |
| `'B'`     | unsigned char      | int               | 1                     |
| `'u'`     | Py_UNICODE         | Unicode character | 2                     |
| `'h'`     | signed short       | int               | 2                     |
| `'H'`     | unsigned short     | int               | 2                     |
| `'i'`     | signed int         | int               | 2                     |
| `'I'`     | unsigned int       | int               | 2                     |
| `'l'`     | signed long        | int               | 4                     |
| `'L'`     | unsigned long      | int               | 4                     |
| `'q'`     | signed long long   | int               | 8                     |
| `'Q'`     | unsigned long long | int               | 8                     |
| `'f'`     | float              | float             | 4                     |
| `'d'`     | double             | float             | 8                     |
 
   参考地址:https://docs.python.org/3/library/array.html
Array
构造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])
-  
     
typecode_or_type:同上  -  
     
size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。  -  
     
kwds:传递给typecode_or_type构造函数的参数  -  
     
lock:同上  
使用示例:
import multiprocessing
def f(n, a):
    n.value = 3.14
    a[0] = 5
if __name__ == '__main__':
    num = multiprocessing.Value('d', 0.0)
    arr = multiprocessing.Array('i', range(10))
    p = multiprocessing.Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])
 
   注意:Value和Array只适用于Process类。
Pipe(用于管道通信)
-  
     
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。  
-  
     
send(obj):通过连接发送对象。obj是与序列化兼容的任意对象  -  
     
recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。  -  
     
close():关闭连接。如果conn1被垃圾回收,将自动调用此方法  -  
     
fileno():返回连接使用的整数文件描述符  -  
     
poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。  -  
     
recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。  -  
     
send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收  -  
     
recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。  
使用示例:
from multiprocessing import Process, Pipe
import time
\# 子进程执行方法
def f(Subconn):
    time.sleep(1)
    Subconn.send("吃了吗")
    print("来自父亲的问候:", Subconn.recv())
    Subconn.close()
if __name__ == "__main__":
    parent_conn, child_conn = Pipe()  # 创建管道两端
    p = Process(target=f, args=(child_conn,))  # 创建子进程
    p.start()
    print("来自儿子的问候:", parent_conn.recv())
    parent_conn.send("嗯")
 
   Manager(用于资源共享)
-  
     
address:(hostname,port),指定服务器的网址地址,默认为简单分配一个空闲的端口  -  
     
authkey:连接到服务器的客户端的身份验证,默认为current_process().authkey的值  
-  
     
start([initializer[, initargs]]):启动一个单独的子进程,并在该子进程中启动管理器服务器  -  
     
get_server():获取服务器对象  -  
     
connect():连接管理器对象  -  
     
shutdown():关闭管理器对象,只能在调用了start()方法之后调用  
-  
     
address:只读属性,管理器服务器正在使用的地址  
-  
     
Array(self,*args,**kwds)  -  
     
BoundedSemaphore(self,*args,**kwds)  -  
     
Condition(self,*args,**kwds)  -  
     
Event(self,*args,**kwds)  -  
     
JoinableQueue(self,*args,**kwds)  -  
     
Lock(self,*args,**kwds)  -  
     
Namespace(self,*args,**kwds)  -  
     
Pool(self,*args,**kwds)  -  
     
Queue(self,*args,**kwds)  -  
     
RLock(self,*args,**kwds)  -  
     
Semaphore(self,*args,**kwds)  -  
     
Value(self,*args,**kwds)  -  
     
dict(self,*args,**kwds)  -  
     
list(self,*args,**kwds)  
使用示例:
import multiprocessing
def f(x, arr, l, d, n):
    x.value = 3.14
    arr[0] = 5
    l.append('Hello')
    d[1] = 2
    n.a = 10
if __name__ == '__main__':
    server = multiprocessing.Manager()
    x = server.Value('d', 0.0)
    arr = server.Array('i', range(10))
    l = server.list()
    d = server.dict()
    n = server.Namespace()
    proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
    proc.start()
    proc.join()
    print(x.value)
    print(arr)
    print(l)
    print(d)
    print(n)
 
   同步子进程模块
Lock(互斥锁)
构造方法:Lock()
实例方法:
-  
     
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。  -  
     
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。  
使用示例:
from multiprocessing import Process, Lock
def l(lock, num):
    lock.acquire()
    print("Hello Num: %s" % (num))
    lock.release()
if __name__ == '__main__':
    lock = Lock()  # 这个一定要定义为全局
    for num in range(20):
        Process(target=l, args=(lock, num)).start()
 
   RLock(可重入的互斥锁(同一个进程可以多次获得它,同时不会造成阻塞)
RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
构造方法:RLock()
实例方法:
-  
     
acquire([timeout]):同Lock  -  
     
release(): 同Lock  
Semaphore(信号量)
构造方法:Semaphore([value])
-  
     
value:设定信号量,默认值为1  
实例方法:
-  
     
acquire([timeout]):同Lock  -  
     
release(): 同Lock  
使用示例:
from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
    sem.acquire()
    print('%s 占到一个茅坑' % user)
    time.sleep(random.randint(0, 3))
    sem.release()
    print(user, 'OK')
if __name__ == '__main__':
    sem = Semaphore(2)
    p_l = []
    for i in range(5):
        p = Process(target=go_wc, args=(sem, 'user%s' % i,))
        p.start()
        p_l.append(p)
    for i in p_l:
        i.join()
 
   Condition(条件变量)
构造方法:Condition([lock/rlock])
-  
     
可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。  
-  
     
acquire([timeout]):首先进行acquire,然后判断一些条件。如果条件不满足则wait  -  
     
release():释放 Lock  -  
     
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。处于wait状态的线程接到通知后会重新判断条件。  -  
     
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。  -  
     
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。  
使用示例:
import multiprocessing
import time
def stage_1(cond):
    """perform first stage of work,
    then notify stage_2 to continue
    """
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('{} done and ready for stage 2'.format(name))
        cond.notify_all()
def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('{} running'.format(name))
if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]
    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()
    s1.join()
    for c in s2_clients:
        c.join()
 
   Event(事件)
使用示例:
import multiprocessing
import time
def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()
    w2 = multiprocessing.Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()
    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')
 
   其他内容
from multiprocessing.dummy import Pool as ThreadPool
 
   multiprocessing.dummy与早期的threading,不同的点好像是在多多核CPU下,只绑定了一个核心(具体未考证)。
参考文档:
-  
     
https://docs.python.org/3/library/multiprocessing.html  -  
     
https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/  
Python并发之concurrent.futures
Executor
ThreadPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用。
class concurrent.futures.ThreadPoolExecutor(max_workers)
 
   使用max_workers数目的线程池执行异步调用。
ProcessPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用。
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
 
   submit()方法
Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。
Executor.submit(fn, *args, **kwargs)
-  
     
fn:需要异步执行的函数  -  
     
*args, **kwargs:fn参数  
使用示例:
from concurrent import futures
def test(num):
    import time
    return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 1)
    print(future.result())
 
   map()方法
Executor.map(func, *iterables, timeout=None)
-  
     
func:需要异步执行的函数  -  
     
*iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。  -  
     
timeout:设置每次异步操作的超时时间,timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。  
使用示例:
from concurrent import futures
def test(num):
    import time
    return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    for future in executor.map(test, data):
        print(future)
 
   shutdown()方法
释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。
Executor.shutdown(wait=True)
Future
-  
     
cancel():试图取消调用。如果调用当前正在执行,并且不能被取消,那么该方法将返回False,否则调用将被取消,方法将返回True。  -  
     
cancelled():如果成功取消调用,返回True。  -  
     
running():如果调用当前正在执行并且不能被取消,返回True。  -  
     
done():如果调用成功地取消或结束了,返回True。  -  
     
result(timeout=None):返回调用返回的值。如果调用还没有完成,那么这个方法将等待超时秒。如果调用在超时秒内没有完成,那么就会有一个Futures.TimeoutError将报出。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。  -  
     
exception(timeout=None):返回调用抛出的异常,如果调用还未完成,该方法会等待timeout指定的时长,如果该时长后调用还未完成,就会报出超时错误futures.TimeoutError。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。如果调用完成并且无异常报出,返回None.  -  
     
add_done_callback(fn):将可调用fn捆绑到future上,当Future被取消或者结束运行,fn作为future的唯一参数将会被调用。如果future已经运行完成或者取消,fn将会被立即调用。  -  
     
wait(fs, timeout=None, return_when=ALL_COMPLETED)  -  
      
等待fs提供的 Future 实例(possibly created by different Executor instances) 运行结束。返回一个命名的2元集合,分表代表已完成的和未完成的  -  
      
return_when 表明什么时候函数应该返回。它的值必须是一下值之一:  -  
       
FIRST_COMPLETED :函数在任何future结束或者取消的时候返回。  -  
       
FIRST_EXCEPTION :函数在任何future因为异常结束的时候返回,如果没有future报错,效果等于  -  
       
ALL_COMPLETED :函数在所有future结束后才会返回。  -  
     
as_completed(fs, timeout=None):参数是一个 Future 实例列表,返回值是一个迭代器,在运行结束后产出 Future实例 。  
使用示例:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
    print(x.result())
print(2)
 
   参考链接:https://pythonhosted.org/futures 作者:钱魏Way 来源:https://www.biaodianfu.com/python-multi-thread-and-multi-process.html
    
    
本文仅做学术分享,如有侵权,请联系删文。
      
      
       
       
      
       
      
      
       
       
        
        
         
         
          
          
           
            —THE END— 
            
          
           
         
          
         
         
          
          
           
            
             
              
               
                
                 
                  
                  
                 
                
               
              
            
          
           
         
          
        
         
       
        
      
       
 
  
