线程篇

GIL是必须的,这是Python设计的问题:Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。 在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/O操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高(但是当它使用多进程库时,性能可以获得提高)。

那是不是由于GIL的存在,多线程库就是个「鸡肋」呢?当然不是。事实上我们平时会接触非常多的和网络通信或者数据输入/输出相关的程序,比如网络爬虫、文本处理等等。这时候由于网络情况和I/O的性能的限制,Python解释器会等待读写数据的函数调用返回,这个时候就可以利用多线程库提高并发效率了。

同步机制

  • 信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from random import random
from threading import Thread, Semaphore

sema = Semaphore(3)


def foo(tid):
with sema:
print '{} acquire sema'.format(tid)
wt = random() * 2
time.sleep(wt)
print '{} release sema'.format(tid)


threads = []

for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from threading import Thread, Lock

value = 0
lock = Lock()


def getlock():
global value
with lock:
new = value + 1
time.sleep(0.001)
value = new

threads = []

for i in range(100):
t = Thread(target=getlock)
t.start()
threads.append(t)

for t in threads:
t.join()

print value
  • 可重入锁 RLock

acquire() 能够不被阻塞的被同一个线程调用多次。但是要注意的是release()需要调用与acquire()相同的次数才能释放锁。

  • 条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import threading

def consumer(cond):
t = threading.currentThread()
with cond:
cond.wait() # wait()方法创建了一个名为waiter的锁,并且设置锁的状态为locked。这个waiter锁用于线程间的通讯
print '{}: Resource is available to consumer'.format(t.name)


def producer(cond):
t = threading.currentThread()
with cond:
print '{}: Making resource available'.format(t.name)
cond.notifyAll() # 释放waiter锁,唤醒消费者


condition = threading.Condition()

c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))

c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()
  • event
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# coding=utf-8
import time
import threading
from random import randint


TIMEOUT = 2

def consumer(event, l):
t = threading.currentThread()
while 1:
event_is_set = event.wait(TIMEOUT)
if event_is_set:
try:
integer = l.pop()
print '{} popped from list by {}'.format(integer, t.name)
event.clear() # 重置事件状态
except IndexError: # 为了让刚启动时容错
pass


def producer(event, l):
t = threading.currentThread()
while 1:
integer = randint(10, 100)
l.append(integer)
print '{} appended to list by {}'.format(integer, t.name)
event.set() # 设置事件
time.sleep(1)


event = threading.Event()
l = []

threads = []

for name in ('consumer1', 'consumer2'):
t = threading.Thread(name=name, target=consumer, args=(event, l))
t.start()
threads.append(t)

p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)

for t in threads:
t.join()
  • 队列(示例为优先级队列)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import time
import threading
from random import randint
from Queue import PriorityQueue


q = PriorityQueue()


def double(n):
return n * 2


def producer():
count = 0
while 1:
if count > 5:
break
pri = randint(0, 100)
print 'put :{}'.format(pri)
q.put((pri, double, pri)) # (priority, func, args)
count += 1


def consumer():
while 1:
if q.empty():
break
pri, task, arg = q.get()
print '[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg))
q.task_done()
time.sleep(0.1)


t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()

实现进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# coding=utf-8
import time
import threading
from random import random
from Queue import Queue


def double(n):
return n * 2


class Worker(threading.Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self._q = queue
self.daemon = True
self.start()
def run(self):
while 1:
f, args, kwargs = self._q.get()
try:
print 'USE: {}'.format(self.name) # 线程名字
print f(*args, **kwargs)
except Exception as e:
print e
self._q.task_done()


class ThreadPool(object):
def __init__(self, num_t=5):
self._q = Queue(num_t)
# Create Worker Thread
for _ in range(num_t):
Worker(self._q)
def add_task(self, f, *args, **kwargs):
self._q.put((f, args, kwargs))
def wait_complete(self):
self._q.join()


pool = ThreadPool()
for _ in range(8):
wt = random()
pool.add_task(double, wt)
time.sleep(wt)
pool.wait_complete()