Python性能优化-极速数据处理 -- Numba和Pandas

  |  

摘要: 《Python性能分析与优化》笔记,数据处理场景如何做 Python 的性能优化。

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


写在前面

这是 Python 性能分析与优化系列的第 12 篇文章,主要参考 Python性能分析与优化 这本书。

往期回顾

第一期: Python性能分析基础
第二期: Python性能分析器 — cProfile
第三期: 使用cProfile进行性能分析与优化实践
第四期: Python性能分析器 — line_profiler
第五期: line_profiler性能分析实践 — 优化倒排索引
第六期: Python性能分析-可视化
第七期: Python性能优化-优化每一个细节
第八期: Python性能优化-多进程与多线程
第九期: Python性能优化-PyPy
第十期: Python性能优化-Cython-1
第十一期: Python性能优化-Cython-2

依靠在前面若干期的分析与优化技术,针对一般的需求,我们可以写出非常快速且令人满意的代码了。

但是实际工作中还会有一些特殊情况需要优化,例如数据处理这个场景。数据处理是编程领域众多的话题之一,由于 Python 经常用于解决科学研究和数据科学的问题,所以对于Python来说,数据处理是主流的话题。

本文主要讨论以下两种工具:

  • Numba: 用机器码实现高性能的 Python 代码
  • Pandas: 高性能的数据结构和分析工具

1. Numba

Numba 是一个模块,可以通过装饰器控制 Python 解释器把函数转变成机器码。因此 Numba 实现了与 C 和 Cython 同样的性能,但不需要用新的解释器或者写 C 代码。

  • Numba 只是针对数组操作进行优化,非常适合配合 Numpy 使用
  • 并非每个函数都适合用 Numba 优化,滥用会损害性能

conda 安装

1
conda install numba

源码安装(numpy和llvmlite包需要提前装好)

1
2
git clone git://github.com/numba/numba.git
python setup build_ext --inplace

Numba 的主要功能

  • 即时代码生成(On-the-fly code generation)
  • CPU和GPU原生代码生成
  • 与具有NumPy依赖的Python科学计算软件配合使用

Numba 代码生成

使用@jit装饰器。加上它就表示要用Numba的JIT编译器对函数进行优化。

关于 JIT 编译器的好处,我们在文章 Python性能优化-PyPy 中提到过。

(1) 延迟编译

延迟编译是默认的,也是官方推荐的方法

当函数被调用时,Numba将生成优化代码。它将引用属性类型和函数的返回类型

1
2
3
4
5
from numba import jit
@jit
def func(a, b):
...
return ...

(2) 及时编译

如果知道函数的接收类型/返回类型,可以把这些类型传到@jit装饰器。之后,只有这种特殊情况会被优化。

增加的部分会被传递到函数的签名。

1
2
3
4
5
from numba import jit, int32
@jit(int32(int32, int32))
def func(a, b):
...
return ...

用于指定函数签名的常用类型

  • void: 函数返回值类型,表示不返回任何结果。
  • intpuintp: 指针大小的整数,分别表示签名和无签名类型。
  • intcuintc: 相当于C语言的整型和无符号整型。
  • int8,int16,int32int64: 固定宽度整型(无符号整型前面加u,比如uint8)。
  • float32float64: 单精度和双精度浮点数类型。
  • complex64complex128: 单精度和双精度复数类型。
  • 数组可以用任何带索引的数值类型表示,比如float32[:]就是一维浮点数数组类型,int32[:,:]就是二维整型数组。

(3) 没有GIL

无论何时,只要我们的代码用原始类型优化(不是用Python类型), GIL 就不再必要了。

我们可以把nogil=True属性传到装饰器。这样我们就可以用多线程运行Python代码(Numba代码)。

只要不再受GIL的限制,你就可以处理多线程系统的常见问题了(一致性、数据同步、竞态条件等)。

(4) 无Python模式

设置Numba的编译模式,一共有两种模式。

  • object模式: 它产生的代码可以处理所有Python对象,并用C API完成Python对象上的操作。
  • nopython模式: 它可以不调用C API而生成更高效的代码。

强制转换 nopython 模式的方法:

1
2
3
4
@jit(nopython=True)
def add2(a, b):
...
return ...

如果是 object 模式(不加 nopython=True),要注意: 如果Numba不利用循环JIT(loop-jitting)方法,object模式就不会产生更快的代码。也就是说循环可以被提取然后编译成nopython模式

nopython模式的问题

  • 有一些限制,支持的Python子集范围有限
  • 函数里表示数值的所有原生类型都可以被引用
  • 函数里不可以分配新内存

例子: 求矩阵的和, numba 的优化过程

原始代码

1
2
3
4
5
6
7
def sum0(n, m):
array = np.arange(n * m).reshape(n, m)
_sum = 0
for i in range(n):
for j in range(m):
_sum += array[i, j]
return _sum

object模式(autojit)

1
2
3
4
5
6
7
8
@jit
def sum1(n, m):
array = np.arange(n * m).reshape(n, m)
_sum = 0
for i in range(n):
for j in range(m):
_sum += array[i, j]
return _sum

将循环抽出,编译成 nopython

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# force compilation in nopython mode
@jit(nopython=True)
def jitted_loop(array, n, m):
_sum = 0
for i in range(n):
for j in range(m):
_sum += array[i, j]
return _sum

# compiled in object mode
@jit
def sum2(n, m):
array = np.arange(n * m).reshape(n, m)
return jitted_loop(array, n, m)

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time

from auto_jitting import sum0, sum1, sum2

N = int(1e3)
M = int(1e4)

def main():
start = time.process_time()
res = sum0(N, M)
end = time.process_time() - start
print("origin:")
print(" result: {}, time: {:.6f}".format(res, end))
start = time.process_time()
res = sum1(N, M)
end = time.process_time() - start
print("auto jit:")
print(" result: {}, time: {:.6f}".format(res, end))
start = time.process_time()
res = sum2(N, M)
end = time.process_time() - start
print("loop-nopython:")
print(" result: {}, time: {:.6f}".format(res, end))

if __name__ == "__main__":
main()

结果

1
2
3
4
5
6
origin:
result: 49999995000000, time: 3.046496
auto jit:
result: 49999995000000, time: 0.652819
loop-nopython:
result: 49999995000000, time: 0.279308

在 GPU 上运行代码

Numba代码可以运行在CPU和GPU上。在GPU上运行并行程序可以进一步提升性能,比CPU上运行更快。

具体地说就是,Numba支持CUDA编程。按照CUDA模式的规则把一部分Python代码翻译成CUDA核心与设备支持的语言形式。

GPU编程是个很大的主题,所以这里不展开细节,只说明Numba具有这种能力,通过装饰器@cuda.jit就可以实现。

详细内容可以参考 Numba for CUDA


2. Pandas 工具

在数据分析领域,有两种语言主导性能竞赛: R 和Python。

在数据处理方面 Python 的速度显然是不够快的。这就是 pandas 被创造出来的原因。

用 pandas 写代码时在性能方面的一个亮点,是 pandas 的关键代码都是用 Cython 写的。

例子: 读取文件并统计某个字段各个值出现的次数

数据 tweets.csv 格式如下:

1
2
3
4
5
6
7
"0","1","2","3","4","5"
"0","1686133317","Sun May 03 03:56:12 PDT 2009","NO_QUERY","bgubbles","just beat peter in bowlin... but then he won "
"0","1686133440","Sun May 03 03:56:15 PDT 2009","NO_QUERY","Brook_K220","Off to work again "
"0","1686133767","Sun May 03 03:56:20 PDT 2009","NO_QUERY","lewishudson01","@LucasCruikshank I feel sorry for you "
"0","1686133923","Sun May 03 03:56:24 PDT 2009","NO_QUERY","CartiTarti","Trust my dad to divert the traffic. He always has to get involved http://twitpic.com/4h1sn"
"0","1686134211","Sun May 03 03:56:30 PDT 2009","NO_QUERY","vonnavon314","...one minor over look caused me to miss my mother _ at least I spoke to her & can go back to sleep!!! : D"
"0","1686134298","Sun May 03 03:56:32 PDT 2009","NO_QUERY","glenna_boo","life got me restless. i guess its all about the mistakes. even the HUGE ones. "

数据共 160 万行,228M

我们要统计第三列的时间字符串中,前三个项目【星期 月份 日期】的每个组合出现的次数。

首先如果读取出某一行在第二列的字符串,我们需要一个函数获取到前三个项目的子串。

1
2
def get_key(record):
return " ".join(record.split(" ")[:3])

纯 Python 方法

step1: 用 csv 标准库读文件,每一行以字符串存储。
step2: 枚举每一行,用空格分隔得到记录的列表 records
step3: 将 records[2] 传入 get_key
step4: 用字典对返回的 get_key 返回的字符串计数。
step5: 返回按计数排序后最大的十条

1
2
3
4
5
6
7
8
9
10
11
12
def readCSV(fname):
with open(fname, 'r') as csvfile:
reader = csv.reader(csvfile)
lines = [line for line in reader]
return lines

def process(fname):
content = readCSV(fname)
mapping = collections.defaultdict(int)
for records in content:
mapping[get_key(records[2])] += 1
return sorted(mapping.items(), reverse=True, key=lambda a: int(a[1]))[:10]

Pandas 处理方式1 — 用 apply(func, axis=1, args=(…)) 按行处理

用 pd.read_csv 读文件
用 df.apply(func, axis=1, args=(mapping,)) 处理每一行
在处理行 row 的 func 中,将 row.iloc[2] 传入 get_key,返回的字符串在字典中计数
返回按计数排序后最大的十条

1
2
3
4
5
6
7
8
def func1(row, mapping):
mapping[get_key(row.iloc[2])] += 1

def process_pandas1(fname):
df = pd.read_csv(fname, header=None)
mapping = collections.defaultdict(int)
df.apply(func1, axis=1, args=(mapping,))
return sorted(mapping.items(), reverse=True, key=lambda a: int(a[1]))[:10]

Pandas 处理方式2 — 用 df.iloc[:,2].map(func) 对特定列按元素处理

用 pd.read_csv 读文件
用 df.iloc[:,2].map(func) 处理特定列的每一个元素
在处理特定元素 record 的函数中,将 record 传入 get_key,返回的字符串在字典中计数
在处理特定元素 record 的函数需要一个 mapping 参数,可以用返回函数的函数包装一下,见代码
返回按计数排序后最大的十条

1
2
3
4
5
6
7
8
9
10
def func2(mapping):
def f(record):
mapping[get_key(record)] += 1
return f

def process_pandas2(fname):
df = pd.read_csv(fname, header=None)
mapping = collections.defaultdict(int)
df.iloc[:,2].map(func2(mapping))
return sorted(mapping.items(), reverse=True, key=lambda a: int(a[1]))[:10]

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def test(method):
if method == "0":
start = time.process_time()
total = process(SOURCE_FILE)
end = time.process_time() - start
for item in total:
print("{}\t{}".format(item[0], item[1]))
print("(Pure Python) time: {:.6f}".format(end - start))
elif method == "1":
start = time.process_time()
total = process_pandas1(SOURCE_FILE)
end = time.process_time() - start
for item in total:
print("{}\t{}".format(item[0], item[1]))
print("(Pandas 1) time: {:.6f}".format(end - start))
elif method == "2":
start = time.process_time()
total = process_pandas2(SOURCE_FILE)
end = time.process_time() - start
for item in total:
print("{}\t{}".format(item[0], item[1]))
print("(Pandas 2) time: {:.6f}".format(end - start))

if __name__ == "__main__":
import sys
test(sys.argv[1])

测试结果如下

1
2
3
(Pure Python) time: 4.362686
(Pandas 1) time: 19.090629
(Pandas 2) time: 2.905207

可以看到

  • Pandas 处理方式1(用 apply(func, axis=1, args=(…)) 按行处理) 速度反而变慢了。
  • Pandas 处理方式2(用 df.iloc[:,2].map(func) 对特定列按元素处理) 速度变快了。

因此 Pandas 的用法很重要,用对了才能提高效率。

read_csv 层面的优化

read_csv 有下面这两个参数

usecols: 这个参数可以设置我们需要返回的列,帮助我们快速地从众多列中提取需要的两列数据。
converters: 这个参数可以自动利用函数转换数据类型,不需要再使用apply方法进行转换。

1
2
3
4
5
6
7
8
9
10
def func_optim(mapping):
def f(record):
mapping[record] += 1
return f

def process_pandas2_optim(fname):
df = pd.read_csv(fname, usecols=["2"], converters={"2": get_key}, dtype={"2": str})
mapping = collections.defaultdict(int)
df.iloc[:,0].map(func_optim(mapping))
return sorted(mapping.items(), reverse=True, key=lambda a: int(a[1]))[:10]

测试结果,时间从 2.9 提高到 1.3

1
(Pandas 2 optim) time: 1.327164

Share