上药三品,神与气精

曾因酒醉鞭名马 生怕情多累美人


  • 首页

  • 关于

  • 分类

  • 标签

  • 归档

  • 搜索

taskflow详细2

发表于 2019-01-10 | 分类于 compute | 阅读次数:
字数统计: 814 | 阅读时长 ≈ 3

Engine的必要初始化参数一个是flow,一个就是backends,backends对应的是storage使用何种后端存储,我简化后的代码storage只支持sqlalchemy,所以不直接传入sqlalchemy的session即可,另外一个参数就是我们马上要说的flow

Engine是通过flow来找到需要执行的task的,所以,我们的task都是add到flow的

1
2
3
4
5
6
7
8
9
10
11
12
13

from simpleflow.patterns import graph_flow as gf

class Adder(task.Task):

def execute(self, x, y):
print 'do!!!', x, y
return x + y

gflow = gf.Flow('root')
gflow.add(Adder(name='atask'))
gflow2 = gf.Flow('leaf')
gflow.add(gflow2)

当然flow的add参数可以是task,还可以flow.

flow的代码比较复杂,关键属性_graph就是networkx的图了,不建议现在读代码,粗读一部分,稍微看一下对应的图和关键变量是什么就好和add部分就好。

flow一共有三种,使用到有向图和无向图,当flow调用add添加的task/flow的时候,就是在图中增加节点,对于有向图,添加节点后自动增加边

这里我就直接用我已经砍掉backends层的代码来说明了,我们来看storage里用到的几个对象(taskflow在persistence包,也就持久化包里,我的代码都集中到storage包里了)

Connection taskflow中,Connection上面还套了一层backends,backends就一个函数get_connection用于返回Connection实例, 因为原来代码可以支持文件、内存等接口,所以有这么一层Connection用于存放通用方法,具体有什么方法可以直接看代码我这里就不描述了,方法的名字很明确的描述了方法的作用

models中LogBook,FlowDetail,AtomDetail 原taskflow的models我改为了middleware,这里的models对应的是tables
这样改是为了统一orm框架写法,orm框架中都在models里放表结构
所以这里的这3个对象对应了三张表
LogBook:一次工作流有且只有一个LogBook,其实条记录用处不大,主要用来识别一次工作流,由于外键关系,删除一条logbook将会删除对应的其他表的记录,工作流正常结束后是可以删除相关记录的
FlowDetail:一次工作流只能接收一个flow,虽然前面我们flow中可以添加flow,但是并不会生成FlowDetail记录,顶层flow直接会获取下层flow的具体atom
AtomDetail: task和retry都是继承自atom,所以AtomDetail的的每一行就对应一个task或retry(retry后面单独说),还有之前说的用于传输初始参数的_TaskFlow_INJECTORD

middleware middleware里的LogBook,FlowDetail,AtomDetail就是models里表的dict版增加了一些方法,TaskDetail/RetryDetail自然也是继承了AtomDetail,它们都是存到AtomDetail里,为什么要拆两份而不统一起来呢?一个原因是原taskflow了因为要支持各种backends,所以分了一层出来。还有一个原因是在没有写入(task初始化,返回,取消,报错)的的情况下,图的相关动作都在内存中进行,直接用orm的对象太过笨重,middleware中的对应对象就轻量很多

现在我们来看Storage类了

我们看看engine里对Storage的初始化

1
2
3
4
5
6
7
8
9
10
11

@misc.cachedproperty
def storage(self):
def _scope_fetcher(atom_name):
if self._compiled:
return self._runtime.fetch_scopes_for(atom_name)
else:
return None
return storage.Storage(self._flow_detail,
connection=self.connection,
scope_fetcher=_scope_fetcher)

taskflow详细1

发表于 2019-01-10 | 分类于 compute | 阅读次数:
字数统计: 2.1k | 阅读时长 ≈ 7

taskflow,需要先熟悉两个最基本的概念

工作流 这玩意最直观的一个应用就是oa里审批,上面我们说的游戏业的常见操作也可以作为例子

停服 备份数据库 升级数据库 升级应用 启动服务器

上述操作也是一个工作流, 停服成功才能备份,备份成功才能升级, 升级应用和备份可以同时进行,升级数据库和升级应用都成功了才能启动服务器. 升级失败还要自动回滚.

有限状态机 推举导读. 工作流就是用状态机来实现流程控制和执行任务的

上面两条光看懂了还不行,还要写过一轮深入了解,taskflow使用了openstack的通用状态机项目,所以要能看taskflow先要熟悉openstack的通用状态机项目Automaton

为了熟悉Automaton,我用它写了一个tailfile,作用就是实现tail -f的效果,倒读N行和判断文件是否变化都用到状态机,因为状态和事件都比较少,用Automaton写这些功能有点杀鸡用牛刀,写这个项目的主要目的是用来熟悉状态机.

熟悉完Automaton以后,可以开始来看taskflow(基于最新版本2.14)了。

首先,因为taskflow作为通用项目而不是openstack内部项目,所以taskflow没有使用oslo中的一些通用工具

而且里面同样为了兼容写了一些复杂的接口还用了futurist实现异步,我为了和自己的其他几个项目统一起来并并,基于taskflow2.14生成了一个简化过的项目simpleflow

代码变化了几个如下部分

日志用回oslo_log的方式
删除了sqlalchemy以外的存储方式,table的和所有sql expression都改为orm方式(只支持mysql),后来我熟悉taskflow以后发现删除其他backends只保留mysql不是一个正确做法,有缺陷,后续会说明为什么
因为删除了sqlalchemy以外的存储方式, 所以persitence中backends就么有存在的必要了,直接传sqlalchemy的session即可
persitence中的内容和storage合并
jobs和conductors删除,这个部分可以和主要的taskflow功能无关
Executor不使用futurist,自己写了一个简化的futurist,代码兼容原来的写法,只支持协程,删除多线程和多进程相关支持
读写锁业复制了过来,用协程实现
砍掉了worker based的Engine
因为极大的简化了futurist和删掉了backends层所以比较好读一些,读代码的时候建议看我简化后的项目

我们先来看taskflow的几个主要单位

Engine

Engine是taskflow是启动口, 主要工作 1.创建状态机, 2.循环状态机, 3.在指定状态下通过Executor执行任务, 4.其他接口(存储、调度器等)的初始化

Engine分为好几种. work based的Engine比较特殊我们不看,只看action engine

其实几种action engine其实没有什么区别,通过Executor分为

序列化(阻塞,顺序执行)引擎
基于线程的并行引擎
基于协程的并行引擎
基于多进程的并行引擎
并行引擎的优势是,当个任务没有顺序关联的情况下可以同时执行多个任务

当然,引擎不影响任务之间的顺序关系,除非你想强制一个一个任务执行,否则都应该使用并行引擎

我自己的代码里简化以后只剩下序列化引擎和协程引擎,Executor是什么看下面

Executor

前面说了,Engine会通过Executor执行任务,因为如果Engine直接执行任务的话,整个状态机的循环会受到正在执行的任务的影响,所以包了一层Executor来执行具体的任务(当然具体代码里对Executor的应用会更复杂一点,为了扩展和异常处理包了3层)

在taskflow的源代码中Executor是通过futurist库来实现的,而futurist又是基于futures的,这个库内部实现还是比较复杂的,如果没用过对应库的,建议直接参考我简化的futurist,因为是用eventlet实现的,所以需要熟悉eventlet.

具体的任务代码(比如备份数据库什么的)在一般情况下可以不处理异常,因为执行任务的代码通过except Exception捕获了任务的所有异常.

特殊异常就是CancelledError,这个异常是调度到已经取消任务时由futurist抛出,在读代码的时候需注意下这个的特殊处理

Scheduler

这个没什么好说的,Executor的封装的最上层,最后执行会落实到具体的Executor上

Storage

这个是存储接口,后面说到flow的时候会详细讲到,Storage的初始化在Engine中,一个功能是数据存储的接口,一个功能是作为flow的外层封装

Runtime与machine

在看这个之前,如果你还不熟悉状态机,建议先拿前面说的automaton练练手,如果已经熟悉状态机但是还没看过automaton代码的,建议去看看automaton的代码

machine就是Engine中循环的(automaton)状态机了,一个engine只运行一个状态机,初始化代码在builder.MachineBuilder,MachineBuilder又是在Runtime中调用生成machine的,我们先别管Runtime,先理解一下taskflow的状态机

taskflow状态机并不复杂,但还不熟悉taskflow的时候很容易被搞懵.因为taskflow用到networkx这个图库,而状态机其实就是一个图,所以一开始看的时候,很容易以为taskflow的状态机会非常复杂要看懂图的相关代码才能搞明白,但实际情况是

taskflow的状态机和图无关!因为taskflow状态机的状态很少不需要用图来解决

那么taskflow为什么要用到图库呢,在解决这个疑问前我们先抛开taskflow,自己用状态机设计一个解决前面——”停服 备份数据库 升级数据库 升级应用 启动服务器” 的工作流

首先定义停服状态和对应停服状态执行的代码
定义停服成功的返回,失败的返回,定义进入停服状态的event(这个是起始时间,event就是start)
定义备份数据库状态对应备份执行代码
定义进入备份状态的event(前面的停服成功)
定义备份成功和备份失败的返回,到目前还简单,备份失败大不了多备份几次直到成功,失败了整个状态机终止都可以影响不大
定义升级数据库状态对应备份执行代码
定义进入升级状态的event(前面的备份成功)
定义升级成功和备份失败的返回,这里开始坑了,升级失败要回滚了
发现少了回滚升级失败的状态定义…..增加升级失败回滚失败的定义 …… 回滚升级数据库失败….升级应用是失败…回滚升级应用是失败…..启动失败
设计下来你发现没几个步骤。要定义的状态就越来越多…这也就是状态机复杂以后和图有关的原因了

taskflow非常巧妙的避免了复杂化状态机,taskflow的设计的状态机可以简单的理解只处理2个状态就好

开始….找到任务-执行任务-找到任务….执行任务…终止

执行任务就是调用Executor, 至于找下一个任务的工作,就是封装了图库的flow的工作了.这样设计状态机状态就很少,具体的状态可以看MachineBuilder的注释中有对应表格,对应状态目前粗看一下即可,知道哪个状态是找任务、哪个状态执行任务就可,有些状态涉要看了后面的retry相关才比较好理解,至于flow,这个我们在后面说明

回头来看Runtime,MachineBuilder是由Runtime生成的,状态机的有些callback最终执行的Runtime中的函数,里面会有一些嵌套和封装, Scheduler的封装就在Runtime中,Runtime可以简单理解为状态机调用其它诸如Scheduler,storage等接口的中间件,Runtime在整体理解taskflow的的时候可以不用细看

ds之链表

发表于 2019-01-09 | 分类于 data structure | 阅读次数:
字数统计: 560 | 阅读时长 ≈ 1

链表

对比一下数组 链表这块的话 不需要一块连续的存储空间 只需要“指针”把一组零散的内存块串联起来使用,有效利用了空间

常见的有三种: 单链表、双链表、循环链表

内存块称之为链表的“结点”,为了将所有的节点串联起来,每个链表的节点除了存储数据之外,还需要记录链表的下一个节点的地址,这个记录下个节点地址的指针叫做后继指针next。

双链表节点 还有前驱指针prev

在链表中插入和删除 不需要为了保持内存的连续性而搬移结点,因此插入和删除比较高效。

循环链表是一种特殊的链表 区别在于尾结点

单链表的尾结点指向空地址,而循环链表尾结点指向链表的头结点。像一个环一样首尾相连。

优点在于从链表尾到链表头比较方便,比如约瑟夫问题,使用循环链表就非常方便。

虽然双向链表比较浪费存储空间,但是也带来了 双向的灵活性

java当中的LinkedHashMap这个容器就使用了双链表这种数据结构

为什么使用这个?

一个原因就是使用 空间换时间!!

在内存空间充足的时候,我们使用空间换时间;
内存如果比较紧缺的话,我们就用时间换空间。

缓存本质就是使用空间换时间的设计思想,数据存储在硬盘,比较节省内存。但是查找不是很方便,可以通过缓存的技术,事先将数据加载在内存中,这样虽然比较耗费内存空间,但是每次数据查询的速度就大大提升了。

数组使用连续内存 借助cpu的缓存机制 预读数组中的数据 访问效率更高。数组缺点是大小固定,容易内存不足。

链表中的结点在被频繁插入、删除的操作情况下 容易造成内存碎片,使用java会导致频繁的gc。

ds之数组

发表于 2019-01-09 | 分类于 data structure | 阅读次数:
字数统计: 1.1k | 阅读时长 ≈ 4

为什么数组从零开始

数组是一种线性表数据结构 连续的内存空间来存储一组具有相同类型的数据

数组支持随机访问 根据下标随机访问的时间复杂度为1 适合查找

插入和删除这两个操作比较低效

避免数据搬移 可以先记录下已经删除的数据 每次的删除并不是真正地搬移数据 只是记录数据已经被删除 当数组没有更多空间存储数据时 再触发执行一次真正的删除操作 这样就大大减少了删除操作导致的数据迁移

警惕数组越界的问题 在c语言当中 只要不是访问受限的内存 所有的内存空间都是可以自由访问的 根据数组寻址公式 会定位到某个不属于数组的内存地址上 导致代码的无限循环

访问数组的本质就是访问一段连续内存 只要数组通过偏移计算得到的内存地址是可用的 那么程序有可能不会报任何错误

计算机病毒很多也是利用数组越界访问非法地址的漏洞 来攻击系统 一定要警惕数组越界

c语言当中数组越界检查的工作是給程序员来做的 java则是本身会做越界检查

java提供了容器类 ArrayList

最大的优势是将很多数组的操作封装起来 支持动态扩容

作为高级语言编程者,是不是数组就无用武之地了呢?当然不是

1.Java ArrayList 无法存储基本类型,比如 int long 需要进行封装
2.数据大小事先已知 并且操作简单 可以直接使用数组
3.多维数组情况下 数组更加直观比如 Object[][] array

业务开发情况下 直接使用容器即可 省时省力 做非常底层的开发

性能的优化做到极致 数组优于容器

下标其实是偏移 从1开始计数 每次随机访问元素都需要多一次减法操作

效率优化做到极致?

主要还是历史原因 c语言设计值用0开始计数数组下标。

无论是java js等都是进行模仿

python支持负数下标。

顺便复习学习一下java语言

google家的规范是 2个空格作为缩进!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package array;

/**
* 1) 数组的插入、删除、按照下标随机访问操作;
* 2)数组中的数据是int类型的;
*
*/

public class ArrayOperate {
//定义整型数据data保存数据
public int data[];
//定义数组长度
private int n;
//定义中实际个数
private int count;

//构造方法,定义数组大小
public ArrayOperate(int capacity){
this.data = new int[capacity];
this.n = capacity;
this.count=0;//一开始一个数都没有存所以为0
}

//根据索引,找到数据中的元素并返回
public int find(int index){
if (index<0 || index>=count) return -1;
return data[index];
}

//插入元素:头部插入,尾部插入
public boolean insert(int index, int value){
//数组中无元素

//if (index == count && count == 0) {
// data[index] = value;
// ++count;
// return true;
//}

// 数组空间已满
if (count == n) {
System.out.println("没有可插入的位置");
return false;
}
// 如果count还没满,那么就可以插入数据到数组中
// 位置不合法
if (index < 0||index > count ) {
System.out.println("位置不合法");
return false;
}
// 位置合法
for( int i = count; i > index; --i){
data[i] = data[i - 1];
}
data[index] = value;
++count;
return true;
}
//根据索引,删除数组中元素
public boolean delete(int index){
if (index<0 || index >=count) return false;
//从删除位置开始,将后面的元素向前移动一位
for (int i=index+1; i<count; ++i){
data[i-1] = data[i];
}
//删除数组末尾元素 这段代码不需要也可以
/*int[] arr = new int[count-1];
for (int i=0; i<count-1;i++){
arr[i] = data[i];
}
this.data = arr;*/

--count;
return true;
}
public void printAll() {
for (int i = 0; i < count; ++i) {
System.out.print(data[i] + " ");
}
System.out.println();
}

public static void main(String[] args) {
ArrayOperate array = new ArrayOperate(5);
array.printAll();
array.insert(0, 3);
array.insert(0, 4);
array.insert(1, 5);
array.insert(3, 9);
array.insert(3, 10);
//array.insert(3, 11);
array.printAll();
}


}

程序员的数学之动态规划

发表于 2019-01-05 | 分类于 math | 阅读次数:
字数统计: 292 | 阅读时长 ≈ 1

编辑距离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_str_distance(a, b):
"""
使用状态转移方程,计算两个字符串之间的编辑距离
:param a: 第一个字符串
:param b: 第二个字符串
:return: 两者之间的编辑距离
"""
if a is None or b is None:
return -1
# 初始化 用于记录状态转移的二维表 , d 中 0 下标位置对应表示空串
d = [[0 for j in range(len(b) + 1)] for i in range(len(a) + 1)]
# 如果 i 为 0 ,且 j 大于等于 0 ,那么 d[i][j] 为 j
d[0][:] = [j for j in range(len(b) + 1)]
# 如果 i 大于等于 0 ,且 j 为 0 ,那么 d[i][j] 为 i
for i in range(len(a) + 1):
d[i][0] = i

# 实现状态转移方程
# 注意,代码里的状态转移是从 d[i][j] 到 d[i+1][j+1] ,而不是从 d[i-1][j-1] 到 d[i][j],本质上是一样的
for i in range(len(a)):
for j in range(len(b)):
r = 0
if a[i] != b[j]:
r = 1
first_append = d[i][j+1] + 1
second_append = d[i+1][j] + 1
replace = d[i][j] + r

min_d = min(first_append, second_append, replace)
d[i+1][j+1] = min_d

return d[len(a)][len(b)]


if __name__ == "__main__":
print(get_str_distance("mouse", "mouuuse"))
1…555657…109
John Cheung

John Cheung

improve your python skills

543 日志
33 分类
45 标签
RSS
GitHub Email
© 2020 John Cheung
本站访客数:
|
主题 — NexT.Pisces v5.1.4
博客全站共226.3k字