进程篇

多线程并不能充分利用多核处理器,如果是一个CPU计算型的任务,应该使用多进程模块 multiprocessing 。它的工作方式与线程库完全不同,但是两种库的语法却非常相似。multiprocessing给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。

1
2
from multiprocessing import Pool
from multiprocessing.dummy import Pool

分不清楚的情况下 进行测试 一般可以先直接上多进程

进程之间的通信

进程间的通信(IPC)常用的是rpc、socket、pipe(管道)和消息队列(queue)。多进程模块中涉及到了后面3种。

  • 管道pipe
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process, Pipe


def f(conn):
conn.send(['hello'])
conn.close()


parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv()
p.join()
  • queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
from multiprocessing import Process, JoinableQueue, Queue
from random import random


tasks_queue = JoinableQueue()
results_queue = Queue()


def double(n):
return n * 2


def producer(in_queue):
while 1:
wt = random()
time.sleep(wt)
in_queue.put((double, wt))
if wt > 0.9:
in_queue.put(None)
print 'stop producer'
break


def consumer(in_queue, out_queue):
while 1:
task = in_queue.get()
if task is None:
break
func, arg = task
result = func(arg)
in_queue.task_done()
out_queue.put(result)

processes = []

p = Process(target=producer, args=(tasks_queue,))
p.start()
processes.append(p)

p = Process(target=consumer, args=(tasks_queue, results_queue))
p.start()
processes.append(p)

tasks_queue.join()

for p in processes:
p.join()

while 1:
if results_queue.empty():
break
result = results_queue.get()
print 'Result:', result
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

from multiprocessing import Queue, Process, cpu_count


def apply_func(f, q_in, q_out):
while not q_in.empty():
i, item = q_in.get()
q_out.put((i, f(item)))


def parmap(f, items, nprocs = cpu_count()):
q_in, q_out = Queue(), Queue()
proc = [Process(target=apply_func, args=(f, q_in, q_out))
for _ in range(nprocs)]
sent = [q_in.put((i, item)) for i, item in enumerate(items)]
[p.start() for p in proc]
res = [q_out.get() for _ in sent]
[p.join() for p in proc]

return [item for _, item in sorted(res)]

同步机制

multiprocessing的Lock、Condition、Event、RLock、Semaphore等同步原语和threading模块的机制是一样的,用法也类似,

进程之间共享状态

共享内存(value array)

服务器进程

常见的共享方式有以下几种:

Namespace。创建一个可分享的命名空间。
Value/Array。和上面共享ctypes对象的方式一样。
dict/list。创建一个可分享的dict/list,支持对应数据结构的方法。
Condition/Event/Lock/Queue/Semaphore。创建一个可分享的对应同步原语的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Manager, Process

def modify(ns, lproxy, dproxy):
ns.a **= 2
lproxy.extend(['b', 'c'])
dproxy['b'] = 0


manager = Manager()
ns = manager.Namespace()
ns.a = 1
lproxy = manager.list()
lproxy.append('a')
dproxy = manager.dict()
dproxy['b'] = 2

p = Process(target=modify, args=(ns, lproxy, dproxy))
p.start()
print 'PID:', p.pid
p.join()

print ns.a
print lproxy
print dproxy

分布式进程通信

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 9030
authkey = 'secret'

shared_list = []


class RemoteManager(BaseManager):
pass


RemoteManager.register('get_list', callable=lambda: shared_list)
mgr = RemoteManager(address=(host, port), authkey=authkey)
server = mgr.get_server()
server.serve_forever()

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 9030
authkey = 'secret'


class RemoteManager(BaseManager):
pass


RemoteManager.register('get_list')
mgr = RemoteManager(address=(host, port), authkey=authkey)
mgr.connect()

l = mgr.get_list()
print l
l.append(1)
print mgr.get_list()