如何编码检查依赖关系是否有循环依赖
什么是永不过时的技能:算法思维。
之前做数据仓库的运维,上线部署时需要处理很多任务的依赖关系,所谓任务,就是一个一个 shell 脚本或者存储过程等批处理任务,他们之间是有依赖关系的,由于数据仓库的任务超级多,约 3000 多个任务,这么多的任务是无法使用一张有向无环图来表示,因此依赖关系除了使用直观的有向连线来配置,还使用了隐藏式的配置,就是依赖关系无法使用有向线条来直观的看到。
既然看不到,就有可能出现循环依赖而不自知,只要有可能,就一定会有人犯错,不是你就是他,不是今天就是未来某一天,这就是墨菲定律。这不,我就经历过。
调度平台用的是先进数通的 MoiaControl V5,这是我用过的最好的调度平台了,之前用过 ETlplus,Airflow。但 MoiaControl 中出现循环依赖并不提示,会导致第二天的任务不会跑批,影响数据的时效性。假如你准备面试先进数通这家公司,说你可以为该产品增加一项检查否有循环依赖的功能,我想这一定是个加分项。
那问题来了,如何编码检查任务依赖关系是否有循环依赖?
答案很简单,就是构造一个有向图,进行拓扑排序,如果拓扑排序后没有未访问的点,那就没有环,否则就有环。
下面,我用 Python 来演示这一解决过程,带你彻底掌握拓扑排序。
首先,我们需要借助一种数据结构来表示有向图,使用方便即可,这里,我使用字典来表示,比如表达 a->b, a->c, c->d 这样的依赖关系,我们可以构造字典 edges = { 'a':{'b','c'},'c':{'d'} } 来表示。字典的键表示前驱任务,字典的值是一个集合,表示依赖前驱的任务集合。这样的字典可以借助于标准库的 collections 来快速初始化:
edges = collections.defaultdict(set)
仅保存边是不够的,我们还需要保存顶点,这可以借助一个集合,它可以自动去重,后面看是否所有的任务节点都参与了拓扑排序,就靠它了。
self.edges = collections.defaultdict(set)
vertex = set()
接下来就是拓扑排序的代码实现了。
拓扑排序一般来说有两种思路,一种是广度优先遍历,借助于先进先出的队列,一种是深度优先遍历,借助于后进先出的栈。无论哪一种思路,都与入度和出度有关。下面分别进行分析。
广度优先遍历比较符合人的习惯思维,从前到后逐层推进。它首先找出不被任何任务依赖的任务进入队列,哪一种任务不被任何任务依赖呢?比如 a->b->c ,a 就是不被任何任务依赖的任务,这样的任务有个特点,就是入度为 0,没有箭头指向的任务的入度就是 0。
首先,我们计算所有节点的入度,把所有入度为 0 的任务依次放入队列,然后开始循环遍历队列,取出第一个任务,记为 a,标记为已访问,同时将依赖于 a 的任务的入度都减少 1,如果减少 1 后入度为 0 的任务放入队列。继续循环,直到所有的节点都被访问。如果循环结束,仍有节点未被遍历,说明存在循环依赖,无论如何他们的入度也不可能为 0。
以上的思路,翻译成代码,如下所示:
import collections
class CheckCycle(object):
def __init__(self):
self.vertex = set() # 顶点集合
self.edges = collections.defaultdict(
set
) # 使用字典表示有向边 如 a -> {b,c,e} 表示 b,c,e 均依赖 a
self.indegree = collections.defaultdict(int) # 计算每个顶点的入度
def add_edge(self, from_job: str, to_job: str) -> bool:
"""
添加一条边
"""
if from_job == to_job:
return False
if from_job:
self.vertex.add(from_job)
if not from_job in self.indegree:
self.indegree[from_job] = 0 # 初始化入度为 0
if to_job:
self.vertex.add(to_job)
if not to_job in self.indegree: # 初始化入度为0
self.indegree[to_job] = 0
if from_job and to_job:
if to_job not in self.edges[from_job]: # 防止充分添加相同的边
self.indegree[to_job] += 1 # 入度加 1
self.edges[from_job].add(to_job) # 防止充分添加相同的边
return True
def can_finish(self) -> bool:
"""
Returns:
True: 表示没有环,任务可以完成
False: 表示有环,任务不可以完成
"""
q = collections.deque([u for u in self.indegree if self.indegree[u] == 0])
visited = 0
possible_sequence = []
while q:
visited += 1
u = q.popleft()
possible_sequence.append(u)
for v in self.edges[u]:
self.indegree[v] -= 1
if self.indegree[v] == 0:
q.append(v)
print(f'possible sequence: {"->".join(possible_sequence)}')
return visited == len(self.vertex)
if __name__ == "__main__":
"""
a->b->c
b->d
True
a->b->c
b->d->a
False
"""
check_cycle = CheckCycle()
check_cycle.add_edge(from_job="a", to_job="b")
check_cycle.add_edge(from_job="b", to_job="c")
check_cycle.add_edge(from_job="b", to_job="d")
print(check_cycle.can_finish())
check_cycle = CheckCycle()
check_cycle.add_edge(from_job="a", to_job="b")
check_cycle.add_edge(from_job="b", to_job="c")
check_cycle.add_edge(from_job="b", to_job="d")
check_cycle.add_edge(from_job="d", to_job="a")
print(check_cycle.can_finish())
时间复杂度和空间复杂度的分析同广度优先遍历算法,都是 O(m+n), m 是顶点数,n 是边数,不在赘述。
另一种方法就是深度优先遍历, 深度优先遍历则是一种逆向思维,为了简单的理解,先考虑没有环的情况,如 a->b->c-d :从任意任务节点出发,假如是 b,一条路走到黑,遍历到最后一个节点 d ,将其入栈,同时将已经访问过的节点标记为已访问(b,c 已访问),将已入栈的节点标记为已完成(d 已完成),还没有访问过的节点标记为未访问 (a 未访问)。也就是说任何一个节点,只会有以下三种状态:
「未访问」:我们还没有访问到这个节点,使用 0 来表示。
「已访问」:我们访问过这个节点,但还没有回溯到该节点,即该节点还没有入栈,还有相邻的节点没有完成,使用 1 来表示。
「已完成」:我们访问过且回溯过这个节点,即该节点已经入栈,并且所有该节点的后续节点都出现在栈的更底部的位置,满足拓扑排序的要求,使用 2 来表示。
现在回溯到 c,发现 c 已访问,且 c 的后续节点 d 已经完成,因此将 c 入栈,标记为已完成,依次类推,现在,栈底到栈顶依次为 d,c,b。然后从剩余节点 a 出发,执行同样的逻辑,a 也入栈,标记为完成,最终从栈底到栈顶为 d,c,b,a,将这些节点依次出栈,即为拓扑排序。
现在考虑有环的情况 a->b->c->d->b,访问到 d 时,继续访问 b 发现 b 已经被访问,说明有环,退出即可。根据上面的分析,不难写出以下深度遍历的代码:
def can_finish2(self) -> bool:
"""
深度优先遍历
Returns:
True: 表示没有环,任务可以完成
False: 表示有环,任务不可以完成
"""
visited = collections.defaultdict(int) # 保存每个顶点是否被访问过
for job in self.vertex:
visited[job] = 0 # 初始化,均未被访问过
result = list() # 模拟栈
valid = True
def dfs(from_job: str):
nonlocal valid
visited[from_job] = 1
for to_job in self.edges[from_job]:
if visited[to_job] == 0:
dfs(to_job)
if not valid:
return
elif visited[to_job] == 1:
valid = False
return
visited[from_job] = 2
result.append(from_job)
for job in self.vertex:
if valid and not visited[job]:
dfs(job)
# print(result)
return valid
时间复杂度即为深度优先遍历或广度优先遍历的时间复杂度,都为 O(m+n) ,其中 m 是顶点数,n 是边数,对应着任务数和任务的依赖数。
其实即使写不出深度优先或广度优先的代码关系也不大,只有会灵活使用就行,网上都是现成的代码,最重要的是要理解这些代码,为我所用。
想使用代码时不必辛苦的复制,回复「拓扑排序」获取可执行代码。
感谢你的点赞支持。