Python性能优化-多进程与多线程

  |  

摘要: 《Python性能分析与优化》笔记,并发层面怎样做 Python 性能优化

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


写在前面

这是 Python 性能分析与优化系列的第 8 篇文章,主要参考 Python性能分析与优化 这本书。

往期回顾

第一期: Python性能分析基础
第二期: Python性能分析器 — cProfile
第三期: 使用cProfile进行性能分析与优化实践
第四期: Python性能分析器 — line_profiler
第五期: line_profiler性能分析实践 — 优化倒排索引
第六期: Python性能分析-可视化
第七期: Python性能优化-优化每一个细节


Python 实现并行任务的过程中, 需要关注两个主题:

  • 多线程(multithreading): 这是实现真正的并行任务的经典做法。像 C++ 和 Java 也提供了这类特性。
  • 多进程(multiprocessing): 并非主流,且有一些痛点需要解决,但是它可以作为多线程的另一种版本。

1. 并行与并发

并行与并发的差异如下图所示

parallel_concurrency.jpeg

并发是现代操作系统常用的技术。因为这种技术与计算机处理器的数量无关。操作系统首先需要时刻关注处理器的任务调度时间表,记录每个任务需要的运行时间,然后在不同任务之间进行上下文切换,给每个任务一个时隙。

2. 多线程

多线程的程序可以在同样的上下文中同时运行多条线程,这些线程共享一个进程的资源。

多线程主要的有点如下

  • 持续响应,多线程可以把长期运行的任务放在一个工作线程中,在程序并发地运行任务时可以持续响应客户。
  • 更快的执行速度,在多核或多处理器情况下,多线程可以通过真正的并行提高程序运行速度
  • 降低资源消耗,多线程程序可以利用一个进程的资源影响应多个请求
  • 更简单的状态共享与进程间通信机制
  • 并行化: 多核与多处理器凶可以实现多线程的每个线程独立运行。Cuda 和 OpenCL 是利用多处理器进行并行计算的 GPU 运算环境。

多线程的缺点

  • 线程同步: 由于多个线程是在同一块数据上运行,需要引入预防竞态条件(Race Condition)的机制。
  • 容错: 多个线程好像是独立运行的,但是某个线程出现问题,可能导致进程崩溃
  • 死锁: 线程执行任务时会锁住正在使用的资源,当 a 线程等待 b 线程释放某资源同时 b 线程等待 a 线程释放某资源,发生死锁。

GIL 的限制

通常,多线程技术完全可以在多处理器系统上实现并行计算。但是 CPython 有 GIL 限制:组织多个线程同时运行 Python 字节码。

CPython的GIL是有必要的,因为CPython的内存管理不是线程安全的。因此,为了让每个任务都按顺序进行,它需要确保运行过程中内存不被干扰;它可以更快地运行单线程程序,简化C语言扩展库的使用方法,因为它不需要考虑多线程问题。

在 GIL 限制下使用多线程

  1. 用C写程序, 用 Python 封装: 由于GIL只阻止多个线程同时运行Python的字节码,所以你可以用C语言写程序,然后用Python封装。这样,在程序运行过程中GIL就不会干扰多线程并发了。
  2. 在 IO 密集型任务上用多线程(例如服务器): 服务器大部分时间都是在读数据包。这种情况下,增加线程可以读取更多的包,虽然这并不是真正的并行。这样做可以增加服务器的性能

3. 线程

在Python里使用线程,可以采用下面两种方法:

  • _thread模块: 这个模块提供了有限的线程能力。它很容易使用,适合小项目,不过也会增加一点额外的资源消耗。
  • threading: 这个模块提供了一些更强大、高层次的线程支持。

_thread

_thread 使用简单,不用写很多代码。

该模块中有一个 start_new_thread 方法,可以传以下参数:

  • 函数,即要运行的代码,函数返回时线程就停止运行
  • 一组(元组)参数,即函数的参数
  • 还可传入一个可选的命名参数字典
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import _thread as thread
import time

# Prints the time 5 times, once every "delay" seconds
def print_time(threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print("{}: {}".format(threadName, time.ctime(time.time())))

# Create two threads as follows
try:
thread.start_new_thread(print_time, ("Thread-1", 2))
thread.start_new_thread(print_time, ("Thread-2", 4))
except:
print("Error: unable to start thread")

# We need to keep the program working, otherwise the threads won't live
while 1:
pass

该模块中还有一些线程原生接口:

  • interrupt_main: 向主线程发送中断异常(KeyboardInterrupt),相当于运行时 Ctrl+C。
  • exit: 从后台退出程序,比 interrupt_main 的优点是中断线程时不会引起其它异常。

allocate_lock

allocate_lock 返回一个线程锁,保护重要代码在运行过程中不受竞态条件的干扰。

线程锁对象主要有三个方法:

  1. acquire: 为当前线程请求锁,可以接受一个可选参数。参数是 0,则线程锁一旦被请求则立即被获取。否则表示线程可以等待。
  2. release: 释放线程锁,让下一个线程使用它
  3. locked: 返回线程锁当前是否被某个线程占用

例子: 多线程加一个全局变量的值

不加锁的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import _thread as thread
import time

global_value = 0

def run(threadName):
global global_value
local_copy = global_value
print("{} with value {}".format(threadName, local_copy))
global_value = local_copy + 1

for i in range(10):
thread.start_new_thread(run, ("Thread-" + str(i),))

# We need to keep the program working, otherwise the threads won't live
while 1:
pass

打印结果如下:

1
2
3
4
5
6
7
8
9
10
Thread-5 with value 0
Thread-6 with value 1
Thread-9 with value 2
Thread-1 with value 2
Thread-3 with value 3
Thread-4 with value 3
Thread-8 with value 3
Thread-0 with value 3
Thread-2 with value 3
Thread-7 with value 3

不同线程读取全局变量的时候,读取的是同一个值。我们需要确保代码在赋值、增加、打印数值时,都是被加锁保护的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import _thread as thread
import time

global_value = 0

def run(threadName, lock):
global global_value
lock.acquire()
local_copy = global_value
print("{} with value {}".format(threadName, local_copy))
global_value = local_copy + 1
lock.release()

lock = thread.allocate_lock()

for i in range(10):
thread.start_new_thread(run, ("Thread-" + str(i), lock))

# We need to keep the program working, otherwise the threads won't live
while 1:
pass

threading

处理非常简单的问题

在处理非常简单的问题时,可以直接实例化 threading.Thread 这个类。

回到多线程加一个全局变量的值的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time

global_value = 0

def run( threadName, lock ):
global global_value
lock.acquire()
local_copy = global_value
print("{} with value {}".format(threadName, local_copy))
global_value = local_copy + 1
lock.release()

lock = threading.Lock()

for i in range(10):
t = threading.Thread(target=run, args=("Thread-" + str(i), lock))
t.start()

处理复杂的问题

(1) threading.Thread, run, start

对于更复杂的情况,如果要更好地封装线程的行为,我们需要继承 threading.Thread,创建子类。

  • 改写 run
  • 改写 init,需要首先调用父类的 init
  • 当线程的 run 方法停止或抛出未处理的异常时,线程将停止
  • init 的 name 参数可以命名线程

注意:即使重写 run 方法,里面包含线程的主要逻辑,在线程的方法被调用时你也掌握不了它。但是可以调用 start 方法,这样将创建一个新线程,然后在上下文中调用 run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time

class MyThread(threading.Thread):
def __init__(self, count):
threading.Thread.__init__(self)
self.total = count

def run(self):
for i in range(self.total):
time.sleep(1)
print("Thread: {} - {}".format(self.name, i))


t = MyThread(4)
t2 = MyThread(3)

t.start()
t2.start()

print("This program has finished")

输出结果

1
2
3
4
5
6
7
8
This program has finished
Thread: Thread-2 - 0
Thread: Thread-1 - 0
Thread: Thread-1 - 1
Thread: Thread-2 - 1
Thread: Thread-1 - 2
Thread: Thread-2 - 2
Thread: Thread-1 - 3

(2) join

在上面的例子的输出结果中可以看到,程序在其它内容出现之前已经先发送了退出消息。例子中虽然没有报错,但这个特性在某些情况不可接受,考虑下面的这段代码的例子,线程中要用某个资源,但是释放资源的消息先发送了,这种情况就会报错。

1
2
3
4
5
6
f = open("output-file.txt", "w+")
t = MyThread(4, f)
t2 = MyThread(3, f)
t.start()
t2.start()
f.close()
  • join 可以接收一个时限的参数,秒为单位。
  • join 返回值是 None,要检查操作是否超时,就看 join 方法返回后线程是否还处于激活状态,是的化就超时了。
例子: 检查一组网站的请求状态码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import urllib.request
import threading

sites = [
"https://www.baidu.com",
"https://www.zhihu.com/"
]

class HTTPStatusChecker(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.url = url
self.status = None

def getURL(self):
return self.url

def getStatus(self):
return self.status

def run(self):
self.status = urllib.request.urlopen(self.url).getcode()

threads = []
for url in sites:
t = HTTPStatusChecker(url)
t.start() # start the thread
threads.append(t)

# let the main thread join the others,
# so we can print their result after all of them have finished.
for t in threads:
t.join()

for t in threads:
print("{}: {}".format(t.getURL(), t.getStatus()))

(3) Event 实现线程间通信

Event 包含一个内部标记(internal flag), 以及可以使用set()或clear()方法的调用线程(caller thread)。

Event类的接口如下:

  • is_set: 如果事件设置了内部标记,就返回True。
  • set: 把内部标记设置为True。它可以唤醒等待被设置标记的所有线程。调用wait()方法的线程将不再被阻塞
  • clear: 重置内部标记。调用wait()方法的线程,在调用set()方法之前都将被阻塞。
  • wait: 在事件的内部标记被设置好之前,使用这个方法会一直阻塞线程调用。这个方法接收一个可选参数—等待时限(timeout)。如果等待时限非0,则线程会在时限内被一直阻塞

例子: 轮流打印字符串

两个线程将共享同一个事件对象。在while循环中,每次循环时,一个线程设置标记,另一个线程重置标记。每一次动作(set和clear),它们都会打印正确的字符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time

class ThreadA(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event

def run(self):
count = 0
while count < 5:
time.sleep(1)
if self.event.is_set():
print("A")
self.event.clear()
count += 1

class ThreadB(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event

def run(self):
count = 0
while count < 5:
time.sleep(1)
if not self.event.is_set():
print("B")
self.event.set()
count += 1

event = threading.Event()

ta = ThreadA(event)
tb = ThreadB(event)

ta.start()
tb.start()

总结: Python 多线程的使用时机

用线程的情况:

  • 频繁的 IO 操作程序
  • 并行任务可以通过并发解决
  • GUI 开发

不用线程的情况:

  • 大量CPU操作的程序
  • 程序必须用多核OS

4. 多进程

在多进程里,线程被换成了一个个子进程。每个进程都运作各自的GIL。

线程都是同一个进程的组成部分,它们共享同一块内存、存储空间和计算资源。而进程却不会与生成它们的父进程共享内存,因此进程间的通信比线程间通信更加复杂。

多进程与多线程的对比

优势 劣势
使用多核OS 更多内存消耗
独立内存空间,避免竞态问题 进程间数据共享困难
子进程容易中断 IPC 比线程困哪

multiprocessing.Process

相当于 threading.Thread,代码结构差不多。

以下为直接实例化 multiprocessing.Process 这个类的例子,与直接实例化 threading.Thread 的例子对比。

1
2
3
4
5
6
7
8
9
import multiprocessing

def run(pname):
print(pname)

for i in range(10):
p = multiprocessing.Process(target=run, args=("Process-" + str(i),))
p.start()
p.join()

进程退出状态

当进程结束(或中断)的时候,会产生一个退出码(exitcode),它是一个数字,表示执行的结果。

  • 等于0表示正常完结
  • 大于0表示异常完结
  • 小于0表示进程被另一个进程通过-1*exit_code信号终结

进程退出状态的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing
import time

def first():
print("There is no problem here")

def second():
raise RuntimeError("Error raised!")

def third():
time.sleep(3)
print("This process will be terminated")

workers = [multiprocessing.Process(target=first)
,multiprocessing.Process(target=second)
,multiprocessing.Process(target=third)
]

for w in workers:
w.start()

workers[-1].terminate()

for w in workers:
w.join()

for w in workers:
print(w.exitcode)

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
There is no problem here
Process Process-2:
Traceback (most recent call last):
File "/home/ppp/anaconda3/envs/python-3.6/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/ppp/anaconda3/envs/python-3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "multiprocessing_exit_code_example.py", line 8, in second
raise RuntimeError("Error raised!")
RuntimeError: Error raised!
0
1
-15
  • 第三个子进程的 print 没有执行,因为 sleep 结束前进程已经被 terminate 方法中止了
  • 如果开启每个子进程时都执行 join,而不是还没有用 join 就调用 terminate 了,则第三个进程的 print 就不会失败,且退出码将是 0。因为join()方法在目标进程完结之前会阻塞子进程的调用

进程池

multiprocessing.Pool 里装有子进程,可以通过不同的方法执行一组任务。

Pool 的主要方法

  • apply: 独立的子进程中运行一个函数,会在被调用函数返回结果之前阻塞进程
  • apply_async: 在独立子进程中异步地运行一个函数,进程会立即返回一个 ApplyResult 对象。要获得真实的返回值,需要用 get() 方法,get() 在异步执行的函数结束之前都会被阻塞
  • map: 对一组数值应用同一个函数。它是一个阻塞动作,所以返回值是每个值经过函数映射的列表

进程间通信

multiprocessing.Queue: 既线程安全又进程安全的 FIFO 数据交换机制。它是 queue.Queue 的克隆版本,两者 API 基本相同。

用 Queue 进行进程间通信的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Queue, Process
import random

def generate(q):
while True:
value = random.randrange(10)
q.put(value)
print("Value added to queue: {}".format(value))

def reader(q):
while True:
value = q.get()
print("Value from queue: {}".format(value))

queue = Queue()
p1 = Process(target=generate, args=(queue,))
p2 = Process(target=reader, args=(queue,))

p1.start()
p2.start()

multiprocessing.Pipe 为两个进程提供一种双向通信的机制。Pipe() 返回一对链接对象,每个对象表示管道的一端。每个对象有 send()recv() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Pipe, Process
import random

def generate(pipe):
while True:
value = random.randrange(10)
pipe.send(value)
print("Value sent: {}".format(value))

def reader(pipe):
f = open("output.txt", "w")
while True:
value = pipe.recv()
f.write(str(value))
print(".")

input_p, output_p = Pipe()
p1 = Process(target=generate, args=(input_p,))
p2 = Process(target=reader, args=(output_p,))

p1.start()
p2.start()

multiprocessing.Event 与 threading.Event 类似。但是 Event 对象不能被传递到子进程函数中,信号对象只能通过继承机制在进程间共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Event, Pool
import time

def worker
print "A - %s" % (time.time())
rint("A - {}".format(time.time()))
event.clear()
else:
time.sleep(0.1)
print("B - {}".format(time.time()))
event.set()

event = Event()
event.set()

pool = Pool(3)
pool.map(worker, range(9))

总结

多进程和多线程是两种多任务处理方式,它们各自有各自的特性和优缺点,而具体如何选择完全由开发者自行决定。

由于它们适用于不同的场景,所以并非一个绝对比另一个好,虽然它们看起来像是在解决同样的问题。


Share