上药三品,神与气精

曾因酒醉鞭名马 生怕情多累美人


  • 首页

  • 关于

  • 分类

  • 标签

  • 归档

  • 搜索

asyncio篇

发表于 2018-12-08 | 阅读次数:
字数统计: 756 | 阅读时长 ≈ 2

Python 3.5添加了async和await这两个关键字,分别用来替换asyncio.coroutine和yield from。自此,协程成为新的语法,而不再是一种生成器类型了。

事件循环与协程的引入,可以极大提高高负载下程序的I/O性能。除此之外还增加了async with(异步上下文管理)、async for(异步迭代器)语法。特别说的是,在Python 3.6里面终于可以用异步生成器了!

1.给一个函数添加了async关键字,就会把它变成一个异步函数。
2.每个线程有一个事件循环,主线程调用asyncio.get_event_loop时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete方法,事件循环会安排协同程序的执行。和方法名字一样,异步的任务完成方法才会就执行完成了。
3.为了在asyncio中使用concurrent.futures的执行器,我这用到了run_in_executor,它可以接收要同步执行的任务。
4.给task设置num属性,是因为后面的completed中的Future对象只包含结果,但是我们并不知道num是什么,所以hack了下,之后的例子中会有其他的方案,本文是给大家提供各种解题的思路,在合适的场景还是有用处的。
5.await asyncio.wait(blocking_tasks)就是协同的执行那些同步的任务,直到完成。
6.最后根据
num找到和执行结果的对应关系,排序然后打印结果。

async/await是Python提供的异步编程API,而asyncio只是一个利用 async/await API进行异步编程的框架

现存的一些库其实并不能原生的支持asyncio(因为会发生阻塞或者功能不可用),比如requests,如果要写爬虫,配合asyncio的应该用aiohttp,其他的如数据库驱动等各种Python对应的库也都得使用对应的aioXXX版本了

简单的说,进程/线程是操作系统充当了EventLoop调度,而协程是自己用epoll进行调度。

协程是异步非阻塞的另外一种展现形式。Golang,Erlang,Lua协程都是这个模型。

协程之间的切换,往往是用户通过代码来显式指定的(跟各种 callback 类似),不需要内核参与,可以很方便的实现异步。

协程本质上也是异步非阻塞技术,它是将事件回调进行了包装,让程序员看不到里面的事件循环。程序员就像写阻塞代码一样简单。比如调用 client->recv() 等待接收数据时,就像阻塞代码一样写。实际上是底层库在执行recv时悄悄保存了一个状态,比如代码行数,局部变量的值。然后就跳回到EventLoop中了。什么时候真的数据到来时,它再把刚才保存的代码行数,局部变量值取出来,又开始继续执行。

why-not-gevent

发表于 2018-12-08 | 阅读次数:
字数统计: 1.1k | 阅读时长 ≈ 3

在python 的并发编程领域 以前有tornado 后来有gevent

python 在发展的过程当中 有过一些失败的修复CPython 的缺陷和提高性能的尝试,比如消除GIL(这么多年 这么多的大牛 没有一个解决的方案???) 也有成功的案例 比如 Pypy

协程

每个人都在谈论协程的好处 优点 为了KPI 强行上协程 并不可取。

Coroutine 也就是 corporate routine,直译为「协同的例程」,中文一般叫做「协程」, 实际上这个概念和进程与线程有相似之处, 因为linux线程就是所谓的「轻量级进程」。

gevent源码分析的描述当中

相同点:
二者都是可以看做是一种执行流, 该执行流可以挂起,并且在将来又可以在 你挂起的地方恢复执行, 这实际上都可以看做是continuation, 我们来看看当我们挂 起一个执行流时我们要保存的东西:
栈, 因为如果你不保存栈,那么局部变量你就无法恢复,同时函数的调用链你也无 法恢复,
寄存器的状态: 这好理解, 比如说EIP,如果你不保存,那么你恢复执行流就不知道 到底执行哪一条指令, 在比如说ESP,EBP, 如果你不保存,那么你即便有完整的栈 你也不知道怎么用.
这二者实际就是所谓的上下文,也可以说是continuation. 在执行流切换时必须保存 这两个东西, 内核调度进程时也是一回事.

不同点:
执行流的调度者不同, 进程是内核调度, 而协程是在用户态调度, 也就是说进程 的上下文是在内核态保存恢复的,而协程是在用户态保存恢复的. 很显然用户态的 代价更低
进程会被抢占,而协程不会,也就是说协程如果不主动让出CPU,那么其他的协程是不 可能得到执行机会,这实际和早期的操作系统类似,比如DOS, 它有一个yield原语, 一个进程调用yield,那么它就会让出CPU, 其他的进程也就有机会执行了, 如果一 个进程进入了死循环,那么整个系统也就挂起了,永远无法运行其他的进程了, 但 对协程而言,这不是问题
对内存的占用不同,实际上协程可以只需要4K的栈就够了, 而进程占用的内存要大 的多.
从操作系统的角度讲, 多协程的程序是单线程,单进程的

协程的优势在于

由开发者决定协程的切换,操作系统无法干预切换,且占用内存小的多。

Gevent是一种基于协程的Python网络库,它用到Greenlet提供的,封装了libevent事件循环的高层同步API。它让开发者在不改变编程习惯的同时,用同步的方式写异步I/O的代码。

gevent 缺点

  • Monkey-patching。中文「猴子补丁」,常用于对测试环境做一些hack。我个人不太喜欢这种「黑魔法」,因为如果其他人不了解细节,极为容易产生困惑。Gvanrossum说用它就是”patch-and-pray”,太形象了。由于Gevent直接修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。但是我们无法保证你在复杂的生产环境中有哪些地方使用这些标准库会由于打了补丁而出现奇怪的问题,那么你只能祈祷(pray)了。其次,在Python之禅中明确说过:「Explicit is better than implicit.」,猴子补丁明显的背离了这个原则。最后,Gvanrossum说Stackless之父Christian Tismer也赞同他。 我喜欢显式的「yield from」
  • 第三方库支持。得确保项目中用到其他用到的网络库也必须使用纯Python或者明确说明支持Gevent,而且就算有这样的第三方库,我还会担心这个第三方库的代码质量和功能性。
  • Greenlet不支持Jython和IronPython,这样就无法把gevent设计成一个标准库了。

建议py 3.6 之后 选择标准库 asyncio

协程篇

发表于 2018-12-05 | 分类于 web | 阅读次数:
字数统计: 436 | 阅读时长 ≈ 1

协程其实就是可以由程序自主控制的线程

在python里主要由yield 和yield from 控制,可以通过生成者消费者例子来理解协程

利用yield from 向生成器(协程)传送数据

传统的生产者-消费者是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。

如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,换回生产者继续生产,效率极高

1
2
3
4
5
6
7
8
9
10
11
12

import asyncio


async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")


loop=asyncio.get_event_loop()
loop.run_until_complete(hello())

进程
   启动多个进程 进程之间是由操作系统负责调用
线程
   启动多个线程 真正被CPU执行的最小单位实际是线程
开启一个线程 创建一个线程 寄存器 堆栈
关闭一个线程
协程
本质上是一个线程
能够在多个任务之间切换来节省一些IO时间
协程中任务之间的切换也消耗时间,但是开销要远远小于进程线程之间的切换

在高IO的时候可以使用 例如爬虫, 爬虫需要请求很多url,使用协程可以让请求同时发出,而不会因为在等待一个url的请求响应而阻塞程序 基本上就是多线程适用的场景!!!

不适用于高计算的环境, 因为在计算时cpu是一直工作的, 频繁的切换运行的程序,会白白增加切换程序的时间,导致计算效率下降

进程篇

发表于 2018-12-05 | 分类于 web | 阅读次数:
字数统计: 702 | 阅读时长 ≈ 3

多线程并不能充分利用多核处理器,如果是一个CPU计算型的任务,应该使用多进程模块 multiprocessing 。它的工作方式与线程库完全不同,但是两种库的语法却非常相似。multiprocessing给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。

1
2
from multiprocessing import Pool
from multiprocessing.dummy import Pool

分不清楚的情况下 进行测试 一般可以先直接上多进程

进程之间的通信

进程间的通信(IPC)常用的是rpc、socket、pipe(管道)和消息队列(queue)。多进程模块中涉及到了后面3种。

  • 管道pipe
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process, Pipe


def f(conn):
conn.send(['hello'])
conn.close()


parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv()
p.join()
  • queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
from multiprocessing import Process, JoinableQueue, Queue
from random import random


tasks_queue = JoinableQueue()
results_queue = Queue()


def double(n):
return n * 2


def producer(in_queue):
while 1:
wt = random()
time.sleep(wt)
in_queue.put((double, wt))
if wt > 0.9:
in_queue.put(None)
print 'stop producer'
break


def consumer(in_queue, out_queue):
while 1:
task = in_queue.get()
if task is None:
break
func, arg = task
result = func(arg)
in_queue.task_done()
out_queue.put(result)

processes = []

p = Process(target=producer, args=(tasks_queue,))
p.start()
processes.append(p)

p = Process(target=consumer, args=(tasks_queue, results_queue))
p.start()
processes.append(p)

tasks_queue.join()

for p in processes:
p.join()

while 1:
if results_queue.empty():
break
result = results_queue.get()
print 'Result:', result
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

from multiprocessing import Queue, Process, cpu_count


def apply_func(f, q_in, q_out):
while not q_in.empty():
i, item = q_in.get()
q_out.put((i, f(item)))


def parmap(f, items, nprocs = cpu_count()):
q_in, q_out = Queue(), Queue()
proc = [Process(target=apply_func, args=(f, q_in, q_out))
for _ in range(nprocs)]
sent = [q_in.put((i, item)) for i, item in enumerate(items)]
[p.start() for p in proc]
res = [q_out.get() for _ in sent]
[p.join() for p in proc]

return [item for _, item in sorted(res)]

同步机制

multiprocessing的Lock、Condition、Event、RLock、Semaphore等同步原语和threading模块的机制是一样的,用法也类似,

进程之间共享状态

共享内存(value array)

服务器进程

常见的共享方式有以下几种:

Namespace。创建一个可分享的命名空间。
Value/Array。和上面共享ctypes对象的方式一样。
dict/list。创建一个可分享的dict/list,支持对应数据结构的方法。
Condition/Event/Lock/Queue/Semaphore。创建一个可分享的对应同步原语的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Manager, Process

def modify(ns, lproxy, dproxy):
ns.a **= 2
lproxy.extend(['b', 'c'])
dproxy['b'] = 0


manager = Manager()
ns = manager.Namespace()
ns.a = 1
lproxy = manager.list()
lproxy.append('a')
dproxy = manager.dict()
dproxy['b'] = 2

p = Process(target=modify, args=(ns, lproxy, dproxy))
p.start()
print 'PID:', p.pid
p.join()

print ns.a
print lproxy
print dproxy

分布式进程通信

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 9030
authkey = 'secret'

shared_list = []


class RemoteManager(BaseManager):
pass


RemoteManager.register('get_list', callable=lambda: shared_list)
mgr = RemoteManager(address=(host, port), authkey=authkey)
server = mgr.get_server()
server.serve_forever()

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 9030
authkey = 'secret'


class RemoteManager(BaseManager):
pass


RemoteManager.register('get_list')
mgr = RemoteManager(address=(host, port), authkey=authkey)
mgr.connect()

l = mgr.get_list()
print l
l.append(1)
print mgr.get_list()

线程篇

发表于 2018-12-05 | 分类于 web | 阅读次数:
字数统计: 981 | 阅读时长 ≈ 4

GIL是必须的,这是Python设计的问题:Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。 在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/O操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高(但是当它使用多进程库时,性能可以获得提高)。

那是不是由于GIL的存在,多线程库就是个「鸡肋」呢?当然不是。事实上我们平时会接触非常多的和网络通信或者数据输入/输出相关的程序,比如网络爬虫、文本处理等等。这时候由于网络情况和I/O的性能的限制,Python解释器会等待读写数据的函数调用返回,这个时候就可以利用多线程库提高并发效率了。

同步机制

  • 信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from random import random
from threading import Thread, Semaphore

sema = Semaphore(3)


def foo(tid):
with sema:
print '{} acquire sema'.format(tid)
wt = random() * 2
time.sleep(wt)
print '{} release sema'.format(tid)


threads = []

for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()
  • 锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from threading import Thread, Lock

value = 0
lock = Lock()


def getlock():
global value
with lock:
new = value + 1
time.sleep(0.001)
value = new

threads = []

for i in range(100):
t = Thread(target=getlock)
t.start()
threads.append(t)

for t in threads:
t.join()

print value
  • 可重入锁 RLock

acquire() 能够不被阻塞的被同一个线程调用多次。但是要注意的是release()需要调用与acquire()相同的次数才能释放锁。

  • 条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import threading

def consumer(cond):
t = threading.currentThread()
with cond:
cond.wait() # wait()方法创建了一个名为waiter的锁,并且设置锁的状态为locked。这个waiter锁用于线程间的通讯
print '{}: Resource is available to consumer'.format(t.name)


def producer(cond):
t = threading.currentThread()
with cond:
print '{}: Making resource available'.format(t.name)
cond.notifyAll() # 释放waiter锁,唤醒消费者


condition = threading.Condition()

c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))

c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()
  • event
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# coding=utf-8
import time
import threading
from random import randint


TIMEOUT = 2

def consumer(event, l):
t = threading.currentThread()
while 1:
event_is_set = event.wait(TIMEOUT)
if event_is_set:
try:
integer = l.pop()
print '{} popped from list by {}'.format(integer, t.name)
event.clear() # 重置事件状态
except IndexError: # 为了让刚启动时容错
pass


def producer(event, l):
t = threading.currentThread()
while 1:
integer = randint(10, 100)
l.append(integer)
print '{} appended to list by {}'.format(integer, t.name)
event.set() # 设置事件
time.sleep(1)


event = threading.Event()
l = []

threads = []

for name in ('consumer1', 'consumer2'):
t = threading.Thread(name=name, target=consumer, args=(event, l))
t.start()
threads.append(t)

p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)

for t in threads:
t.join()
  • 队列(示例为优先级队列)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import time
import threading
from random import randint
from Queue import PriorityQueue


q = PriorityQueue()


def double(n):
return n * 2


def producer():
count = 0
while 1:
if count > 5:
break
pri = randint(0, 100)
print 'put :{}'.format(pri)
q.put((pri, double, pri)) # (priority, func, args)
count += 1


def consumer():
while 1:
if q.empty():
break
pri, task, arg = q.get()
print '[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg))
q.task_done()
time.sleep(0.1)


t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()

实现进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# coding=utf-8
import time
import threading
from random import random
from Queue import Queue


def double(n):
return n * 2


class Worker(threading.Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self._q = queue
self.daemon = True
self.start()
def run(self):
while 1:
f, args, kwargs = self._q.get()
try:
print 'USE: {}'.format(self.name) # 线程名字
print f(*args, **kwargs)
except Exception as e:
print e
self._q.task_done()


class ThreadPool(object):
def __init__(self, num_t=5):
self._q = Queue(num_t)
# Create Worker Thread
for _ in range(num_t):
Worker(self._q)
def add_task(self, f, *args, **kwargs):
self._q.put((f, args, kwargs))
def wait_complete(self):
self._q.join()


pool = ThreadPool()
for _ in range(8):
wt = random()
pool.add_task(double, wt)
time.sleep(wt)
pool.wait_complete()
1…666768…109
John Cheung

John Cheung

improve your python skills

543 日志
33 分类
45 标签
RSS
GitHub Email
© 2020 John Cheung
本站访客数:
|
主题 — NexT.Pisces v5.1.4
博客全站共226.3k字