taskflow详细2

Engine的必要初始化参数一个是flow,一个就是backends,backends对应的是storage使用何种后端存储,我简化后的代码storage只支持sqlalchemy,所以不直接传入sqlalchemy的session即可,另外一个参数就是我们马上要说的flow

Engine是通过flow来找到需要执行的task的,所以,我们的task都是add到flow的

1
2
3
4
5
6
7
8
9
10
11
12
13

from simpleflow.patterns import graph_flow as gf

class Adder(task.Task):

def execute(self, x, y):
print 'do!!!', x, y
return x + y

gflow = gf.Flow('root')
gflow.add(Adder(name='atask'))
gflow2 = gf.Flow('leaf')
gflow.add(gflow2)

当然flow的add参数可以是task,还可以flow.

flow的代码比较复杂,关键属性_graph就是networkx的图了,不建议现在读代码,粗读一部分,稍微看一下对应的图和关键变量是什么就好和add部分就好。

flow一共有三种,使用到有向图和无向图,当flow调用add添加的task/flow的时候,就是在图中增加节点,对于有向图,添加节点后自动增加边

这里我就直接用我已经砍掉backends层的代码来说明了,我们来看storage里用到的几个对象(taskflow在persistence包,也就持久化包里,我的代码都集中到storage包里了)

Connection taskflow中,Connection上面还套了一层backends,backends就一个函数get_connection用于返回Connection实例, 因为原来代码可以支持文件、内存等接口,所以有这么一层Connection用于存放通用方法,具体有什么方法可以直接看代码我这里就不描述了,方法的名字很明确的描述了方法的作用

models中LogBook,FlowDetail,AtomDetail 原taskflow的models我改为了middleware,这里的models对应的是tables
这样改是为了统一orm框架写法,orm框架中都在models里放表结构
所以这里的这3个对象对应了三张表
LogBook:一次工作流有且只有一个LogBook,其实条记录用处不大,主要用来识别一次工作流,由于外键关系,删除一条logbook将会删除对应的其他表的记录,工作流正常结束后是可以删除相关记录的
FlowDetail:一次工作流只能接收一个flow,虽然前面我们flow中可以添加flow,但是并不会生成FlowDetail记录,顶层flow直接会获取下层flow的具体atom
AtomDetail: task和retry都是继承自atom,所以AtomDetail的的每一行就对应一个task或retry(retry后面单独说),还有之前说的用于传输初始参数的_TaskFlow_INJECTORD

middleware middleware里的LogBook,FlowDetail,AtomDetail就是models里表的dict版增加了一些方法,TaskDetail/RetryDetail自然也是继承了AtomDetail,它们都是存到AtomDetail里,为什么要拆两份而不统一起来呢?一个原因是原taskflow了因为要支持各种backends,所以分了一层出来。还有一个原因是在没有写入(task初始化,返回,取消,报错)的的情况下,图的相关动作都在内存中进行,直接用orm的对象太过笨重,middleware中的对应对象就轻量很多

现在我们来看Storage类了

我们看看engine里对Storage的初始化

1
2
3
4
5
6
7
8
9
10
11

@misc.cachedproperty
def storage(self):
def _scope_fetcher(atom_name):
if self._compiled:
return self._runtime.fetch_scopes_for(atom_name)
else:
return None
return storage.Storage(self._flow_detail,
connection=self.connection,
scope_fetcher=_scope_fetcher)