【Python】Python并行——速度++++++++

机器学习初学者

共 5210字,需浏览 11分钟

 ·

2023-04-07 06:01

一直对python的多线程、多进程、分布式多进程比较好奇。今天浅浅地学习了一下,里面涉及的内容其实比较多,包括进程锁、进程间的通信、进程池、共享内存等等。这里给一个简单的、大家可能会常用到的例子——从多个wrfout文件中提取变量T2并单独保存输出为nc文件,一起感受下多进程的魅力。如果不妥之处,还望大家不吝赐教!

常规代码

这份代码是大家实际中经常使用的,通过循环来实现从多个wrfout文件中提取变量T2并单独保存输出为nc文件。

    import xarray as xr
import numpy as np
import glob
import sys
import os
import argparse
import time 

def nc2pkl(args):
    st =time.time()
    if not os.path.exists(args.outdir):
            os.mkdir(args.outdir)
    files = glob.glob(args.files)
    
    for file in files:
        filename = file.split('/')[-1]
        print('Reading ',file)
        sys.stdout.flush()
        ds = xr.open_dataset(file)
        T2 = ds['T2']

        sys.stdout.flush()
        # Write out everything 
        ofile = args.outdir+'/'+filename+'.nc'
        T2.to_netcdf(ofile)
        print('Written to '+ofile)

    et = time.time()
    print(et -st)
  
def parse_args():
    '''
    Parser for nc2pkl
    '''

    parser = argparse.ArgumentParser()
    parser.add_argument('-t','--template',type=str,default="../data/wrfout_d01_2018-08-01_00:00:00")
    parser.add_argument('-f','--files',type=str,default="../data/wrfout_d01_2018-08*")
    parser.add_argument('-o','--outdir',type=str,default="../output/T2")
    return parser.parse_args()

if __name__ == '__main__':
    nc2pkl(parse_args())
多进程并行代码

这份代码里面使用了多进程并行,从num_processes = 4可以知道开了4个进程同时处理,可以简单理解为同一时间同时处理4个wrfout文件。其实能开多少进程取决于我们的计算机有多少核数,在linux上可以通过nproc命令查看核数。

如果大家想使用下面的并行代码满足自己的需求,只需要更改被我用-----框起来的函数定义中的操作即可,比如更改变量,或者增加计算等。

    import xarray as xr
import numpy as np
import multiprocessing as mp
from functools import partial
import glob
import sys
import os
import time

#--------------------------------------
def nc2pkl(file_path, output_dir):
    if not os.path.exists(output_dir):
            os.mkdir(output_dir)
    
    filename = file_path.split('/')[-1]
    print('Reading ',file_path)
    sys.stdout.flush()
    ds = xr.open_dataset(file_path)
    T2 = ds['T2']

    sys.stdout.flush()
    # Write out everything 
    ofile = output_dir+'/'+filename+'.nc'
    T2.to_netcdf(ofile)
    print('Written to '+ofile)
#--------------------------------------

def parallel_nc2pkl(input_dir, output_dir, num_processes):
    st = time.time()
    # 获取所有需要处理的文件路径
    file_paths = glob.glob(os.path.join(input_dir, "wrfout_d01_2018-08*"))

    # 创建进程池
    with mp.Pool(processes=num_processes) as pool:
        # 使用partial函数创建一个只有一个参数的nc2pkl函数
        worker_func = partial(nc2pkl, output_dir=output_dir)

        # 将需要处理的文件路径传递给进程池
        pool.map(worker_func, file_paths)
    et = time.time()
    print(et -st)

if __name__ == "__main__":
    # 设置输入和输出目录
    input_dir = "../data/"
    output_dir = "../output/T2_multi"

    # 设置进程数量
    num_processes = 4

    # 并行处理文件
    parallel_nc2pkl(input_dir, output_dir, num_processes)
计算效率

常规代码耗时及CPU使用情况

b2c15f345b66b24c64c1dfa816d94a61.webp

ba72afb824bffb14df0bcbe4e76c2c72.webp

并行代码耗时及CPU使用情况

d8da09bc02723b9e895cff9f762d41ce.webp

dc71c0cf806a39950067faed15ad563a.webp

从中可以看到,并行代码极大地提升了速度。

注意

无论是多进程还是多线程,一旦任务数量多到一个限度,就会消耗掉系统所有的资源,结果就是效率急剧下降,所有任务都做不好。

参考:

【1】https://mofanpy.com/tutorials/python-basic/multiprocessing/why

【2】https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376

欢迎加入读友交流群

因为群聊已满200人, 需扫描下方二维码 添加小编微信拉你入群

b45ac58013cf50eefb926d140b5cc652.webp

浏览 40
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报