如何编码检查依赖关系是否有循环依赖

Python七号

共 5171字,需浏览 11分钟

 ·

2020-08-30 08:45

什么是永不过时的技能:算法思维。

之前做数据仓库的运维,上线部署时需要处理很多任务的依赖关系,所谓任务,就是一个一个 shell 脚本或者存储过程等批处理任务,他们之间是有依赖关系的,由于数据仓库的任务超级多,约 3000 多个任务,这么多的任务是无法使用一张有向无环图来表示,因此依赖关系除了使用直观的有向连线来配置,还使用了隐藏式的配置,就是依赖关系无法使用有向线条来直观的看到。

既然看不到,就有可能出现循环依赖而不自知,只要有可能,就一定会有人犯错,不是你就是他,不是今天就是未来某一天,这就是墨菲定律。这不,我就经历过。

调度平台用的是先进数通的 MoiaControl V5,这是我用过的最好的调度平台了,之前用过 ETlplus,Airflow。但 MoiaControl 中出现循环依赖并不提示,会导致第二天的任务不会跑批,影响数据的时效性。假如你准备面试先进数通这家公司,说你可以为该产品增加一项检查否有循环依赖的功能,我想这一定是个加分项。

那问题来了,如何编码检查任务依赖关系是否有循环依赖?

答案很简单,就是构造一个有向图,进行拓扑排序,如果拓扑排序后没有未访问的点,那就没有环,否则就有环。

下面,我用 Python 来演示这一解决过程,带你彻底掌握拓扑排序。

首先,我们需要借助一种数据结构来表示有向图,使用方便即可,这里,我使用字典来表示,比如表达 a->b, a->c, c->d 这样的依赖关系,我们可以构造字典 edges = { 'a':{'b','c'},'c':{'d'} } 来表示。字典的键表示前驱任务,字典的值是一个集合,表示依赖前驱的任务集合。这样的字典可以借助于标准库的 collections 来快速初始化:

edges = collections.defaultdict(set)

仅保存边是不够的,我们还需要保存顶点,这可以借助一个集合,它可以自动去重,后面看是否所有的任务节点都参与了拓扑排序,就靠它了。

self.edges = collections.defaultdict(set)
vertex = set()

接下来就是拓扑排序的代码实现了。

拓扑排序一般来说有两种思路,一种是广度优先遍历,借助于先进先出的队列,一种是深度优先遍历,借助于后进先出的栈。无论哪一种思路,都与入度和出度有关。下面分别进行分析。

广度优先遍历比较符合人的习惯思维,从前到后逐层推进。它首先找出不被任何任务依赖的任务进入队列,哪一种任务不被任何任务依赖呢?比如 a->b->c ,a 就是不被任何任务依赖的任务,这样的任务有个特点,就是入度为 0,没有箭头指向的任务的入度就是 0。

首先,我们计算所有节点的入度,把所有入度为 0 的任务依次放入队列,然后开始循环遍历队列,取出第一个任务,记为 a,标记为已访问,同时将依赖于 a 的任务的入度都减少 1,如果减少 1 后入度为 0 的任务放入队列。继续循环,直到所有的节点都被访问。如果循环结束,仍有节点未被遍历,说明存在循环依赖,无论如何他们的入度也不可能为 0。

以上的思路,翻译成代码,如下所示:

import collections

class CheckCycle(object):
    def __init__(self):
        self.vertex = set()  # 顶点集合
        self.edges = collections.defaultdict(
            set
        )  # 使用字典表示有向边 如 a -> {b,c,e} 表示 b,c,e 均依赖 a
        self.indegree = collections.defaultdict(int)  # 计算每个顶点的入度

    def add_edge(self, from_job: str, to_job: str) -> bool:
        """
        添加一条边
        """

        if from_job == to_job:
            return False

        if from_job:
            self.vertex.add(from_job)
            if not from_job in self.indegree:
                self.indegree[from_job] = 0  # 初始化入度为 0
        if to_job:
            self.vertex.add(to_job)
            if not to_job in self.indegree:  # 初始化入度为0
                self.indegree[to_job] = 0
        if from_job and to_job:
            if to_job not in self.edges[from_job]:  # 防止充分添加相同的边
                self.indegree[to_job] += 1  # 入度加 1
                self.edges[from_job].add(to_job)  # 防止充分添加相同的边

        return True

    def can_finish(self) -> bool:
        """
        Returns:
            True: 表示没有环,任务可以完成
            False: 表示有环,任务不可以完成
        """


        q = collections.deque([u for u in self.indegree if self.indegree[u] == 0])
        visited = 0

        possible_sequence = []

        while q:
            visited += 1
            u = q.popleft()
            possible_sequence.append(u)
            for v in self.edges[u]:
                self.indegree[v] -= 1
                if self.indegree[v] == 0:
                    q.append(v)
        print(f'possible sequence: {"->".join(possible_sequence)}')
        return visited == len(self.vertex)


if __name__ == "__main__":
    """
    a->b->c
       b->d
       True
    a->b->c
       b->d->a
       False
    """

    check_cycle = CheckCycle()
    check_cycle.add_edge(from_job="a", to_job="b")
    check_cycle.add_edge(from_job="b", to_job="c")
    check_cycle.add_edge(from_job="b", to_job="d")
    print(check_cycle.can_finish())

    check_cycle = CheckCycle()
    check_cycle.add_edge(from_job="a", to_job="b")
    check_cycle.add_edge(from_job="b", to_job="c")
    check_cycle.add_edge(from_job="b", to_job="d")
    check_cycle.add_edge(from_job="d", to_job="a")
    print(check_cycle.can_finish())

时间复杂度和空间复杂度的分析同广度优先遍历算法,都是 O(m+n), m 是顶点数,n 是边数,不在赘述。

另一种方法就是深度优先遍历, 深度优先遍历则是一种逆向思维,为了简单的理解,先考虑没有环的情况,如 a->b->c-d :从任意任务节点出发,假如是 b,一条路走到黑,遍历到最后一个节点 d ,将其入栈,同时将已经访问过的节点标记为已访问(b,c 已访问),将已入栈的节点标记为已完成(d 已完成),还没有访问过的节点标记为未访问 (a 未访问)。也就是说任何一个节点,只会有以下三种状态:

  • 「未访问」:我们还没有访问到这个节点,使用 0 来表示。

  • 「已访问」:我们访问过这个节点,但还没有回溯到该节点,即该节点还没有入栈,还有相邻的节点没有完成,使用 1 来表示。

  • 「已完成」:我们访问过且回溯过这个节点,即该节点已经入栈,并且所有该节点的后续节点都出现在栈的更底部的位置,满足拓扑排序的要求,使用 2 来表示。

现在回溯到 c,发现 c 已访问,且 c 的后续节点 d 已经完成,因此将 c 入栈,标记为已完成,依次类推,现在,栈底到栈顶依次为 d,c,b。然后从剩余节点 a 出发,执行同样的逻辑,a 也入栈,标记为完成,最终从栈底到栈顶为 d,c,b,a,将这些节点依次出栈,即为拓扑排序。

现在考虑有环的情况 a->b->c->d->b,访问到 d 时,继续访问 b 发现 b 已经被访问,说明有环,退出即可。根据上面的分析,不难写出以下深度遍历的代码:

def can_finish2(self) -> bool:
    """
    深度优先遍历
    Returns:
        True: 表示没有环,任务可以完成
        False: 表示有环,任务不可以完成
    """


    visited = collections.defaultdict(int)  # 保存每个顶点是否被访问过
    for job in self.vertex:
        visited[job] = 0  # 初始化,均未被访问过

    result = list()  # 模拟栈
    valid = True

    def dfs(from_job: str):
        nonlocal valid
        visited[from_job] = 1
        for to_job in self.edges[from_job]:
            if visited[to_job] == 0:
                dfs(to_job)
                if not valid:
                    return
            elif visited[to_job] == 1:
                valid = False
                return
        visited[from_job] = 2
        result.append(from_job)

    for job in self.vertex:
        if valid and not visited[job]:
            dfs(job)

    # print(result)
    return valid

时间复杂度即为深度优先遍历或广度优先遍历的时间复杂度,都为 O(m+n) ,其中 m 是顶点数,n 是边数,对应着任务数和任务的依赖数。

其实即使写不出深度优先或广度优先的代码关系也不大,只有会灵活使用就行,网上都是现成的代码,最重要的是要理解这些代码,为我所用。

想使用代码时不必辛苦的复制,回复「拓扑排序」获取可执行代码。

感谢你的点赞支持。

浏览 51
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报