multiprocessing

进程 线程 协程

python网络编程的两大必学模块socket和socketserver,其中的socketserver是一个支持IO多路复用和多线程、多进程的模块。
一般我们在socketserver服务端代码中都会写这么一句:

server = socketserver.ThreadingTCPServer(settings.IP_PORT, MyServer)

ThreadingTCPServer这个类是一个支持多线程和TCP协议的socketserver,它的继承关系是这样的:

class ThreadingTCPServer(ThreadingMixIn, TCPServer): 
    pass

右边的TCPServer实际上是它主要的功能父类,而左边的ThreadingMixIn则是实现了多线程的类,它自己本身则没有任何代码。
MixIn在python的类命名中,很常见,一般被称为“混入”,戏称“乱入”,通常为了某种重要功能被子类继承。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ThreadingMixIn:

daemon_threads = False

def process_request_thread(self, request, client_address):
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

def process_request(self, request, client_address):

t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()

在ThreadingMixIn类中,其实就定义了一个属性,两个方法。在process_request方法中实际调用的正是python内置的多线程模块threading。这个模块是python中所有多线程的基础,socketserver本质上也是利用了这个模块。

1. thread

线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不独立拥有系统资源,但它可与同属一个进程的其它线程共享该进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。由于线程之间的相互制约,致使线程在运行中呈现出间断性。线程也有就绪、阻塞和运行三种基本状态。就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理机;运行状态是指线程占有处理机正在运行;阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行。每一个应用程序都至少有一个进程和一个线程。线程是程序中一个单一的顺序控制流程。在单个程序中同时运行多个线程完成不同的被划分成一块一块的工作,称为多线程。

以上那一段,可以不用看!举个例子,厂家要生产某个产品,在它的生产基地建设了很多厂房,每个厂房内又有多条流水生产线。所有厂房配合将整个产品生产出来,某个厂房内的所有流水线将这个厂房负责的产品部分生产出来。每个厂房拥有自己的材料库,厂房内的生产线共享这些材料。而每一个厂家要实现生产必须拥有至少一个厂房一条生产线。那么这个厂家就是某个应用程序;每个厂房就是一个进程;每条生产线都是一个线程。

python 主要是 threading 库

普通的多线程

1
2
3
4
5
6
7
8
9
10
11
12
import threading
import time

def show(arg):
time.sleep(1)
print('thread'+str(arg))

for i in range(10):
t = threading.Thread(target=show, args=(i,))
t.start()

print('main thread stop')

上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
下面是Thread类的主要方法:

  • start 线程准备就绪,等待CPU调度
  • setName 为线程设置名称
  • getName 获取线程名称
  • setDaemon 设置为后台线程或前台线程(默认是False,前台线程)
    如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止。
  • join 该方法非常重要。它的存在是告诉主线程,必须在这个位置等待子线程执行完毕后,才继续进行主线程的后面的代码。但是当setDaemon为True时,join方法是无效的。
  • run 线程被cpu调度后自动执行线程对象的run方法

python的GIL,也就是全局解释器锁。在编程语言的世界,python因为GIL的问题广受诟病,因为它在解释器的层面限制了程序在同一时间只有一个线程被CPU实际执行,而不管你的程序里实际开了多少条线程。所以我们经常能发现,python中的多线程编程有时候效率还不如单线程,就是因为这个原因。那么,对于这个GIL,一些普遍的问题如下:

  • 为什么有 GIL

作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题。解释器要留意的是避免在不同的线程操作内部共享的数据。同时它还要保证在管理用户线程时总是有最大化的计算资源。那么,不同线程同时访问时,数据的保护机制是怎样的呢?答案是解释器全局锁GIL。GIL对诸如当前线程状态和为垃圾回收而用的堆分配对象这样的东西的访问提供着保护。

  • 为什么不能去掉

首先,在早期的python解释器依赖较多的全局状态,传承下来,使得想要移除当今的GIL变得更加困难。其次,对于程序员而言,仅仅是想要理解它的实现就需要对操作系统设计、多线程编程、C语言、解释器设计和CPython解释器的实现有着非常彻底的理解。
在1999年,针对Python1.5,一个“freethreading”补丁已经尝试移除GIL,用细粒度的锁来代替。然而,GIL的移除给单线程程序的执行速度带来了一定的负面影响。当用单线程执行时,速度大约降低了40%。虽然使用两个线程时在速度上得到了提高,但这个提高并没有随着核数的增加而线性增长。因此这个补丁没有被采纳。
另外,在python的不同解释器实现中,如PyPy就移除了GIL,其执行速度更快(不单单是去除GIL的原因)。然而,我们通常使用的CPython占有着统治地位的使用量,所以,你懂的。
在Python 3.2中实现了一个新的GIL,并且带着一些积极的结果。这是自1992年以来,GIL的一次最主要改变。旧的GIL通过对Python指令进行计数来确定何时放弃GIL。在新的GIL实现中,用一个固定的超时时间来指示当前的线程以放弃这个锁。在当前线程保持这个锁,且当第二个线程请求这个锁的时候,当前线程就会在5ms后被强制释放掉这个锁(这就是说,当前线程每5ms就要检查其是否需要释放这个锁)。当任务是可行的时候,这会使得线程间的切换更加可预测。

  • 实际建议

建议在IO密集型任务中使用多线程,

在CPU密集型任务中使用多进程。

深入研究python的协程机制,你会有惊喜的。

通常而言,队列是一种先进先出的数据结构,与之对应的是堆栈这种后进先出的结构。但是在python中,它内置了一个queue模块,它不但提供普通的队列,还提供一些特殊的队列。具体如下:

  • queue.Queue :先进先出队列
  • queue.LifoQueue :后进先出队列
  • queue.PriorityQueue :优先级队列
  • queue.deque :双向队列

生产者 消费者

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Liu Jiang
import time
import queue
import threading

q = queue.Queue(10)
def productor(i):
while True:
q.put("厨师 %s 做的包子!"%i)
time.sleep(2)

def consumer(k):
while True:
print("顾客 %s 吃了一个 %s"%(k,q.get()))
time.sleep(1)

for i in range(3):
t = threading.Thread(target=productor,args=(i,))
t.start()

for k in range(10):
v = threading.Thread(target=consumer,args=(k,))
v.start()

线程池

在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,让过来的任务立刻能够使用,就形成了线程池。在python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块。下面是一个简单的线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Liu Jiang

import queue
import time
import threading

class MyThreadPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._q = queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)

def get_thread(self):
return self._q.get()

def add_thread(self):
self._q.put(threading.Thread)

def task(i, pool):
print(i)
time.sleep(1)
pool.add_thread()

pool = MyThreadPool(5)
for i in range(100):
t = pool.get_thread()
obj = t(target=task, args=(i,pool))
obj.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/env python
# -*- coding:utf-8 -*-

"""
一个基于thread和queue的线程池,以任务为队列元素,动态创建线程,重复利用线程,
通过close和terminate方法关闭线程池。
"""

import queue
import threading
import contextlib
import time

# 创建空对象,用于停止线程
StopEvent = object()


def callback(status, result):
"""
根据需要进行的回调函数,默认不执行。
:param status: action函数的执行状态
:param result: action函数的返回值
:return:
"""

pass


def action(thread_name,arg):
"""
真实的任务定义在这个函数里
:param thread_name: 执行该方法的线程名
:param arg: 该函数需要的参数
:return:
"""

# 模拟该函数执行了0.1秒
time.sleep(0.1)
print("第%s个任务调用了线程 %s,并打印了这条信息!" % (arg+1, thread_name))


class ThreadPool:

def __init__(self, max_num, max_task_num=None):
"""
初始化线程池
:param max_num: 线程池最大线程数量
:param max_task_num: 任务队列长度
"""

# 如果提供了最大任务数的参数,则将队列的最大元素个数设置为这个值。
if max_task_num:
self.q = queue.Queue(max_task_num)
# 默认队列可接受无限多个的任务
else:
self.q = queue.Queue()
# 设置线程池最多可实例化的线程数
self.max_num = max_num
# 任务取消标识
self.cancel = False
# 任务中断标识
self.terminal = False
# 已实例化的线程列表
self.generate_list = []
# 处于空闲状态的线程列表
self.free_list = []

def put(self, func, args, callback=None):
"""
往任务队列里放入一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""

# 先判断标识,看看任务是否取消了
if self.cancel:
return
# 如果没有空闲的线程,并且已创建的线程的数量小于预定义的最大线程数,则创建新线程。
if len(self.free_list) == 0 and len(self.generate_list) self.max_num:
self.generate_thread()
# 构造任务参数元组,分别是调用的函数,该函数的参数,回调函数。
w = (func, args, callback,)
# 将任务放入队列
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""

# 每个线程都执行call方法
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数。在正常情况下,每个线程都保存生存状态,
直到获取线程终止的flag。
"""

# 获取当前线程的名字
current_thread = threading.currentThread().getName()
# 将当前线程的名字加入已实例化的线程列表中
self.generate_list.append(current_thread)
# 从任务队列中获取一个任务
event = self.q.get()
# 让获取的任务不是终止线程的标识对象时
while event != StopEvent:
# 解析任务中封装的三个参数
func, arguments, callback = event
# 抓取异常,防止线程因为异常退出
try:
# 正常执行任务函数
result = func(current_thread, *arguments)
success = True
except Exception as e:
# 当任务执行过程中弹出异常
result = None
success = False
# 如果有指定的回调函数
if callback is not None:
# 执行回调函数,并抓取异常
try:
callback(success, result)
except Exception as e:
pass
# 当某个线程正常执行完一个任务时,先执行worker_state方法
with self.worker_state(self.free_list, current_thread):
# 如果强制关闭线程的flag开启,则传入一个StopEvent元素
if self.terminal:
event = StopEvent
# 否则获取一个正常的任务,并回调worker_state方法的yield语句
else:
# 从这里开始又是一个正常的任务循环
event = self.q.get()
else:
# 一旦发现任务是个终止线程的标识元素,将线程从已创建线程列表中删除
self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,让所有线程都停止的方法
"""

# 设置flag
self.cancel = True
# 计算已创建线程列表中线程的个数,然后往任务队列里推送相同数量的终止线程的标识元素
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
在任务执行过程中,终止线程,提前退出。
"""

self.terminal = True
# 强制性的停止线程
while self.generate_list:
self.q.put(StopEvent)

# 该装饰器用于上下文管理
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录空闲的线程,或从空闲列表中取出线程处理任务
"""

# 将当前线程,添加到空闲线程列表中
state_list.append(worker_thread)
# 捕获异常
try:
# 在此等待
yield
finally:
# 将线程从空闲列表中移除
state_list.remove(worker_thread)

# 调用方式
if __name__ == '__main__':
# 创建一个最多包含5个线程的线程池
pool = ThreadPool(5)
# 创建100个任务,让线程池进行处理
for i in range(100):
pool.put(action, (i,), callback)
# 等待一定时间,让线程执行任务
time.sleep(3)
print("-" * 50)
print("任务停止之前线程池中有%s个线程,空闲的线程有%s个 "
% (len(pool.generate_list), len(pool.free_list)))
# 正常关闭线程池
pool.close()
print("任务执行完毕,正常退出!")
# 强制关闭线程池
# pool.terminate()
# print("强制停止任务!")

2. process

在python中multiprocess模块提供了Process类,实现进程相关的功能。但是,由于它是基于fork机制的,因此不被windows平台支持。想要在windows中运行,必须使用if __name == ‘\main__:的方式,显然这只能用于调试和学习,不能用于实际环境。
(PS:在这里我必须吐槽一下python的包、模块和类的组织结构。在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在python中很多,新手容易傻傻分不清。)

想要进程之间进行资源共享可以使用queues/Array/Manager这三个multiprocess模块提供的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process
from multiprocessing import Array

def Foo(i,temp):
temp[0] += 100
for item in temp:
print(i,'----->',item)

if __name__ == '__main__':
temp = Array('i', [11, 22, 33, 44])
for i in range(2):
p = Process(target=Foo, args=(i,temp))
p.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process,Manager

def Foo(i,dic):
dic[i] = 100+i
print(dic.values())

if __name__ == '__main__':
manage = Manager()
dic = manage.dict()
for i in range(10):
p = Process(target=Foo, args=(i,dic))
p.start()
p.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
import multiprocessing
from multiprocessing import Process
from multiprocessing import queues

def foo(i,arg):
arg.put(i)
print('The Process is ', i, "and the queue's size is ", arg.qsize())

if __name__ == "__main__":
li = queues.Queue(20, ctx=multiprocessing)
for i in range(10):
p = Process(target=foo, args=(i,li,))
p.start()

这里就有点类似上面的队列了。从运行结果里,你还能发现数据共享中存在的脏数据问题。另外,比较悲催的是multiprocessing里还有一个Queue,一样能实现这个功能。

3. coroutine

线程和进程的操作是由程序触发系统接口,最后的执行者是系统,它本质上是操作系统提供的功能。而协程的操作则是程序员指定的,在python中通过yield,人为的实现并发处理。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时。协程,则只使用一个线程,分解一个线程成为多个“微线程”,在一个线程中规定某个代码块的执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO)。
在不需要自己“造轮子”的年代,同样有第三方模块为我们提供了高效的协程,这里介绍一下greenlet和gevent。本质上,gevent是对greenlet的高级封装,因此一般用它就行,这是一个相当高效的模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
print('GET: %s' % url)
resp = requests.get(url)
data = resp.text
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])