【Python】Pandas groupby加速处理数据

共 1831字,需浏览 4分钟

 ·

2021-11-09 20:28

在使用pandas的时候,经常会用到groupby这个函数来对数据进行分组统计,同时可以使用 apply函数很方便的对分组之后的数据进行处理。

def data_process(x):    # process    return ...
result = df.groupby('user_id').apply(data_process)


使用joblib进行加速


但是如果数据非常多的时候(比如几千万条数据),运行的效率是比较低的,因为这个时候只使用了一个CPU线程,所以当数据非常多的时候,处理起来会很慢。这个时候CPU其他的核是空闲的,所以考虑使用joblib来多线程加速。

from joblib import Parallel, delayed
def data_process(x): # process return ...
def applyParallel(dfGrouped, func): res = Parallel(n_jobs=4)(delayed(func)(group) for name, group in dfGrouped) return pd.concat(res)
result = applyParallel(df.groupby('user_id'), data_process)


使用pandarallel进行加速


除了使用joblib之外,还可以使用pandarallel进行加速,使用pandarallel无需编写函数,只需要提前初始化pandarallel即可。

from pandarallel import pandarallelpandarallel.initialize(progress_bar=True)# 可选参数:# nb_workers:用于并行化的工作程序数。数值类型,如果未设置,则将使用所有可用的CPU。# progress_bar:如果设置为,则显示进度条True。(False默认为布尔)# verbose:详细级别(2默认为int )#          0-不显示任何日志#          1-仅显示警告日志#          2-显示所有日志# use_memory_fs:(None默认为布尔)如果设置为None且内存文件系统可用,Pandarallel将使用它在主进程和工作进程之间传输数据。# 如果内存文件系统不可用,则Pandarallel将默认进行多处理数据传输(管道)。如果设置为True,则Pandarallel将使用内存文件系统在主进程和工作进程之间传输数据,SystemError如果内存文件系统不可用,则会引发。# 如果设置为False,则Pandarallel将使用多处理数据传输(管道)在主进程和工作进程之间传输数据。
def data_process(x): # process return ...
result = df.groupby('user_id').parallel_apply(data_process)

需要说明的是,pandarallel目前只能在Linux和OS X上运行。使用内存文件系统(参数use_memory_fs控制)可以减少主进程与工作进程之间的数据传输时间,尤其是对于大数据而言。仅当目录/dev/shm存在且用户对其具有读写权限时,才认为内存文件系统可用。基本上,内存文件系统仅在某些Linux发行版(包括Ubuntu)上可用。

并行化具有成本(实例化新进程,通过共享内存发送数据等),因此,只有在要进行并行化的计算量足够高的情况下,并行化才有效。对于很少的数据,使用并行化并不总是值得的。

参考:https://github.com/nalepae/pandaralle

往期精彩回顾




站qq群554839127,加入微信群请扫码:
浏览 77
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报