#multiprocessing# Python多进程与多线程

志扬工作室

共 13583字,需浏览 28分钟

 · 2023-09-18

 文章所涉及内容更多来自网络,在此声明,并感谢知识的贡献者!

Python的多线程效率问题

Python的多线程效率问题
尽管Python完全支持多线程编程, 但是解释器的C语言实现部分在完全并行执行时并不是线程安全的。实际上,解释器被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
在讨论普通的GIL之前,有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。

Python的多线程与多进程


Python的多线程与多进程分析
-多进程是多核运算
-效率:运行耗时最少是:多进程 < 普通 < 多线程


Python的多线程与多进程示例


Python的多线程与多进程示例
import multiprocessing as mp
import threading as td

def joba(a, d):
    print('job-a')
    for i in range(20):
        pass
    print('job-a-finish')

def jobb(a, d):
    print('job-b')
    while 1:
        pass
    print('job-b-finish')

if __name__ == "__main__":
    t1 = td.Thread(target=jobb, args=(1, 2))
    p1 = mp.Process(target=joba, args=(1, 2))
    t1.start()
    p1.start()
    t1.join()
    p1.join()

 Python多线程或多进程的输出存储
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果


Python的多进程池示例


Python的多进程池示例
import multiprocessing as mp
def job(x):
    return x * x
def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
if __name__ == '__main__':
    multicore()


import multiprocessing as mp
def job(x):
    return x * x
def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())
    # 迭代器,i=0时apply一次,i=1时apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 从迭代器中取出
    print([res.get() for res in multi_res])
if __name__ == '__main__':
    multicore()
总结
Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
map() 放入迭代参数,返回多个结果
apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

Python的共享内存


共享内存
只有用共享内存才能让CPU之间有交流
import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
array = mp.Array('i', [1, 2, 3, 4])

其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。
| Type code | C Type             | Python Type       | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'`     | signed char        | int               | 1                     |
| `'B'`     | unsigned char      | int               | 1                     |
| `'u'`     | Py_UNICODE         | Unicode character | 2                     |
| `'h'`     | signed short       | int               | 2                     |
| `'H'`     | unsigned short     | int               | 2                     |
| `'i'`     | signed int         | int               | 2                     |
| `'I'`     | unsigned int       | int               | 2                     |
| `'l'`     | signed long        | int               | 4                     |
| `'L'`     | unsigned long      | int               | 4                     |
| `'q'`     | signed long long   | int               | 8                     |
| `'Q'`     | unsigned long long | int               | 8                     |
| `'f'`     | float              | float             | 4                     |
| `'d'`     | double             | float             | 8                     |

Python共享内存变量的锁
import multiprocessing as mp
import time
def job(v, num, l):
    l.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num  # 获取共享内存
        print(v.value)
    l.release()  # 释放
def multicore():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享内存
    p1 = mp.Process(target=job, args=(v, 1, l))  # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 3, l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
if __name__ == '__main__':
    multicore()
进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

Python的多进程的multiprocessing模块


Python多进程multiprocessing模块

1、multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是函数名字,需要调用的函数
args 函数需要的参数,以 tuple 的形式传入
将daemon设置为True时,则主线程不必等待子进程,主线程结束则所有结束
2、相关方法
star() 方法启动进程,
join() 方法实现进程间的同步,等待所有进程退出。
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
如果要启动大量的子进程,可以用进程池的方式批量创建子进程
Pool提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。
from multiprocessing import Pool
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))
(1)p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(args,kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
(2)p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(args,**kwargs),然后返回结果。此方法的结果是 AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。多进程并发!
(3)p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
(4)p.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用



import multiprocessing
def worker(num):
    """该函数将在子进程中执行"""
    print('Worker %d' % num)

if __name__ == '__main__':
    # 创建进程池
    pool = multiprocessing.Pool(4)
    # 启动进程池中的进程
    pool.map(worker, range(10))
    # 关闭进程池
    pool.close()
    # 等待进程池中的进程结束
    pool.join()

Python的多进程间通讯


Python进程间的通信
多进程编程中,不同的进程之间需要进行通信。multiprocessing模块提供了多种进程间通信的方式,例如使用队列、管道、共享内存等。
  (1)队列
  队列是一种常用的进程间通信方式。multiprocessing模块中提供了Queue类,可以用来创建队列。下面是一个简单的示例:
import multiprocessing

def producer(q):
    """该函数将在生产者进程中执行"""
    for i in range(10):
        q.put(i)

def consumer(q):
    """该函数将在消费者进程中执行"""
    while True:
        item = q.get()
        if item is None:
            break
        print(item)

if __name__ == '__main__':
    # 创建队列
    q = multiprocessing.Queue()
    # 创建生产者进程
    p1 = multiprocessing.Process(target=producer, args=(q,))
    # 创建消费者进程
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    # 启动进程
    p1.start()
    p2.start()
    # 等待进程结束
    p1.join()
    # 发送结束信号
    q.put(None)
    p2.join()
  在上面的代码中,首先创建了一个Queue对象,然后创建了一个生产者进程和一个消费者进程。生产者进程通过调用put方法将0~9的数字放入队列中,消费者进程通过调用get方法从队列中获取数据,并将其打印出来。最后,调用put方法发送结束信号,然后等待两个进程结束。
  (2)管道
  管道是另一种常用的进程间通信方式。multiprocessing模块中提供了Pipe类,可以用来创建管道。下面是一个简单的示例:
import multiprocessing

def producer(conn):
    """该函数将在生产者进程中执行"""
    for i in range(10):
        conn.send(i)
    conn.close()

def consumer(conn):
    """该函数将在消费者进程中执行"""
    while True:
        item = conn.recv()
        if item is None:
            break
        print(item)

if __name__ == '__main__':
    # 创建管道
    conn1, conn2 = multiprocessing.Pipe()
    # 创建生产者进程
    p1 = multiprocessing.Process(target=producer, args=(conn1,))
    # 创建消费者进程
    p2 = multiprocessing.Process(target=consumer, args=(conn2,))
    # 启动进程
    p1.start()
    p2.start()
    # 等待进程结束
    p1.join()
    # 发送结束信号
    conn1.send(None)
    p2.join()
  在上面的代码中,首先创建了一个管道,然后创建了一个生产者进程和一个消费者进程。生产者进程通过调用send方法将0~9的数字发送到管道中,消费者进程通过调用recv方法从管道中获取数据,并将其打印出来。最后,调用send方法发送结束信号,然后等待两个进程结束。
  (3)共享内存
  共享内存是一种高效的进程间通信方式,它允许多个进程共享同一块内存区域。multiprocessing模块中提供了Value和Array类,可以用来创建共享内存。下面是一个简单的示例:
import multiprocessing

def worker1(n):
    """该函数将在进程1中执行"""
    n.value += 1
    print('worker1:', n.value)

def worker2(n):
    """该函数将在进程2中执行"""
    n.value += 1
    print('worker2:', n.value)

if __name__ == '__main__':
    # 创建共享内存
    n = multiprocessing.Value('i', 0)
    # 创建进程1
    p1 = multiprocessing.Process(target=worker1, args=(n,))
    # 创建进程2
    p2 = multiprocessing.Process(target=worker2, args=(n,))
    # 启动进程
    p1.start()
    p2.start()
    # 等待进程结束
    p1.join()
    p2.join()
  在上面的代码中,首先创建了一个Value对象,用于存储一个整数值。然后创建了两个进程,每个进程都会将共享内存中的值加1,并将其打印出来。最后,等待两个进程结束。
  除了Value类之外,multiprocessing模块还提供了Array类,用于创建共享内存数组。

参考资料


GIL 不一定有效率
https://yulizi123.github.io/tutorials/python-basic/threading/5-GIL/
The "freeze_support()" line can be omitted if the program  is not going to be frozen to produce an executable.    raise RuntimeError
https://blog.csdn.net/qq_34905587/article/details/112020651
Python Multiprocessing(多进程)
https://blog.csdn.net/sikh_0529/article/details/126728914
Python多进程multiprocessing模块介绍
https://www.jianshu.com/p/3ff7d04a39bf
Python编程之多进程(multiprocessing)详解
http://stack.itcast.cn/news/20230404/10534657760.shtml
Python multiprocessing进程池使用详解
https://blog.csdn.net/weixin_39253570/article/details/130817783
multiprocessing --- 基于进程的并行
https://docs.python.org/zh-cn/3/library/multiprocessing.html?highlight=multi
Python 异步编程 多进程
https://www.cjavapy.com/article/2427/
python 中的进程池 -- multiprocessing.pool.Pool
http://www.manongjc.com/detail/63-pndlbcnipsucczm.html
Python 异步 IO(asyncio)、多进程(multiprocessing)、多线程(multithreading)性能对比
https://www.jianshu.com/p/cac56b3d9a18

浏览 129
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报