当前位置:首页 > 数据库 > 正文内容

Dolphinscheduler DAG中心源码分析

邻居的猫1个月前 (12-09)数据库1352

布景描绘

file

留意 : 在 Dolphinscheduler 中,离线使命是有完好的声明周期的,比如说中止、暂停、暂停康复、重跑等等,都是以DAG(有向无环图的方式进行使命安排)T+1离线使命的。

Dolphinscheduler DAG完成

org.apache.dolphinscheduler.common.graph.DAG

DAG三个重要的数据结构 :

// 极点信息
private final Map<Node, NodeInfo> nodesMap;

// 边相关信息,作用是记载极点和边的联系,能够找到叶子节点,也能够获取下流节点
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;

// 反向边相关信息,作用是能够快速找到入度为0的节点(开端节点),也能够获取上游节点
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;

如下示例 :

DAG<String, String, String> graph = new DAG<>();
graph.addNode("A", "A");
graph.addNode("B", "B");
graph.addNode("C", "C");

// 增加一个B -> C的边,当时A还飘着呢
graph.addEdge("B", "C");

// 假如增加A -> B,其实便是会从B开端一直到子节点,看有没有可衔接的线到A,假如有,阐明这个A -> B的边增加不得,由于会构成环,不然就能够增加
graph.addEdge("A", "B");

源码剖析 :
org.apache.dolphinscheduler.common.graph.DAG#addEdge

public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
    lock.writeLock().lock();

    try {
        // TODO 是否能够增加该边
        if (!isLegalAddEdge(fromNode, toNode, createNode)) {
            log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
            return false;
        }

        // TODO 增加节点
        addNodeIfAbsent(fromNode, null);
        addNodeIfAbsent(toNode, null);

        // TODO 增加边
        addEdge(fromNode, toNode, edge, edgesMap);
        addEdge(toNode, fromNode, edge, reverseEdgesMap);

        return true;
    } finally {
        lock.writeLock().unlock();
    }
}


private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
    // TODO 假如fromNode和toNode两个是同一个极点,这个边是不能增加的
    if (fromNode.equals(toNode)) {
        log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
        return false;
    }

    // TODO 这儿其实便是想说,不是创立节点,也便是说要求fromNode和toNode是需求存在的极点
    if (!createNode) {
        if (!containsNode(fromNode) || !containsNode(toNode)) {
            log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
            return false;
        }
    }

    // Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the
    // DAG has cycle!
    // TODO 这儿获取节点的数量
    int verticesCount = getNodesCount();

    Queue<Node> queue = new LinkedList<>();

    // TODO 将toNode放入到queue中
    queue.add(toNode);

    // if DAG doesn't find fromNode, it's not has cycle!
    // TODO 当queue不为空,这儿必定就不为空了
    while (!queue.isEmpty() && (--verticesCount > 0)) {
        // TODO 获取行列里边的元素
        Node key = queue.poll();

        for (Node subsequentNode : getSubsequentNodes(key)) {
            // TODO 其实这儿判别的是比如说A -> B 有衔接的DAG图,传入的是节点B,看B节点的边是不是有A,假如有A阐明已经有B -> A的相关了,
            // TODO 就不能增加了。假如比如说B的下流节点,比如说 A -> B -> C,这样的话,B的下流节点便是C,C是需求放入queue中的
            // TODO 中心思维其实便是要找到它要增加的方针节点的连线,是否有方针节点到源节点的连线存在(这样来判别是否存在环)
            if (subsequentNode.equals(fromNode)) {
                return false;
            }

            queue.add(subsequentNode);
        }
    }

    return true;
}

Dolphinscheduler DagHelper说明

DAG类是一个根底通用的DAG东西类,而DagHelper是使命界说、使命界说直接的联系组装成DAG的一个事务东西类。

org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph

public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {

    // TODO 这儿其实便是获取的流程实例对应的使命数和之间的联系
    List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(
            workflowInstance.getProcessDefinitionCode(),
            workflowInstance.getProcessDefinitionVersion());

    // TODO 获取对应的使命界说log
    List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);

    // TODO 获取TaskNode
    List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);

    // generate process to get DAG info
    // TODO 这儿其实解析的是是否自己手动指定的发动节点列表,默许不会
    List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());

    // TODO 假如 默许startNodeNameList为空
    List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());

    // TODO 构建ProcessDag目标实例
    ProcessDag processDag = DagHelper.generateFlowDag(
            taskNodeList,
            startNodeNameList,
            recoveryTaskNodeCodeList,
            workflowInstance.getTaskDependType());

    if (processDag == null) {
        log.error("ProcessDag is null");
        throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null");
    }

    // TODO 生成DAG
    DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag);
    log.debug("Build dag success, dag: {}", dagGraph);

    // TODO 运用WorkflowGraph来封装使命节点列表和dagGraph
    return new WorkflowGraph(taskNodeList, dagGraph);
}

org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag

public static ProcessDag generateFlowDag(
                                             List<TaskNode> totalTaskNodeList,
                                             List<Long> startNodeNameList,
                                             List<Long> recoveryNodeCodeList,
                                             TaskDependType depNodeType) throws Exception {

    // TODO 其实便是拿到一切的节点
    List<TaskNode> destTaskNodeList =
            generateFlowNodeListByStartNode(
                    totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);

    if (destTaskNodeList.isEmpty()) {
        return null;
    }

    // TODO 获取使命节点之前的联系
    List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);

    // TODO 其实便是实例化一个ProcessDag
    ProcessDag processDag = new ProcessDag();
    // TODO 设置DAG的边
    processDag.setEdges(taskNodeRelations);
    // TODO 设置DAG的极点
    processDag.setNodes(destTaskNodeList);
    return processDag;
}

设置了destTaskNodeList和taskNodeRelations

org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph

public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {

    DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();

    // TODO 增加极点
    if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
        for (TaskNode node : processDag.getNodes()) {
            dag.addNode(node.getCode(), node);
        }
    }

    // TODO 增加边
    if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
        for (TaskNodeRelation edge : processDag.getEdges()) {
            dag.addEdge(edge.getStartNode(), edge.getEndNode());
        }
    }
    return dag;
}

转载自 Journey

原文链接:https://segmentfault.com/a/1190000045117764

本文由 白鲸开源 供给发布支撑!

扫描二维码推送至手机访问。

版权声明:本文由51Blog发布,如需转载请注明出处。

本文链接:https://www.51blog.vip/?id=525

标签: 大数据
分享给朋友:

“Dolphinscheduler DAG中心源码分析” 的相关文章

Sql根底

Sql根底

1. sql根底 1.1. 数据库常用的数据类型 1.2. 带n与不带n的差异 1.3. 带var与不带var的差异 1.4. 2.根底操作 1.4.1. 更新句子 1.4.2. 删去句子 1.4.3. 束缚 1.4.4. 修正表结构 1.4.5. 查询表 1.4.6. 含糊查询 _ % [...

MongoDB面试专题33道解析

MongoDB面试专题33道解析

咱们好,我是 V 哥。今日给咱们共享 MongoDB的道 V 哥收拾的面试题,保藏起来,必定会对你有协助。 1. 你说的 NoSQL 数据库是什么意思?NoSQL 与 RDBMS 直接有什么差异?为什么要运用和不运用NoSQL 数据库?说一说 NoSQL 数据库的几个长处? NoSQL("Not...

oracle删除所有表,Oracle数据库中删除所有表的全面指南

Oracle数据库中删除所有表的全面指南在Oracle数据库管理中,有时可能需要删除所有的表,这可能是因为数据库重构、迁移到新版本或者清理不再需要的测试数据。本文将详细介绍如何在Oracle数据库中删除所有表,并提供一些重要的注意事项。准备工作在执行删除所有表的操作之前,以下准备工作是必不可少的:...

北斗大数据,引领时空信息新时代

北斗卫星导航系统(简称北斗系统)是中国自主研发的全球卫星导航系统,旨在为全球用户提供高精度的定位、导航和授时服务。近年来,北斗系统与大数据、物联网、互联网、5G移动通信网、交通网、高铁网、电力网等领域的深度融合,逐步成为信息化网络建设的重要技术手段。1. 终端融合发展:北斗系统通过技术融合创新,在各...

大数据日志分析,二、大数据日志分析概述

大数据日志分析,二、大数据日志分析概述

1. 数据收集: 确定需要分析的日志数据源,例如服务器日志、应用程序日志、网络日志等。 使用日志收集工具(如Fluentd、Logstash等)从各个数据源收集日志数据。2. 数据存储: 将收集到的日志数据存储在适合大数据分析的存储系统中,如Hadoop HDFS、Amazon S...

大数据的4v基本特征包括,揭秘大数据的核心特性

大数据的4v基本特征包括,揭秘大数据的核心特性

大数据的4V基本特征包括:2. Velocity(高速):数据产生和处理的速度非常快。在实时数据流处理场景中,需要快速地收集、处理和分析数据,以便及时做出决策。3. Variety(多样):大数据的来源和格式非常多样化,包括结构化数据(如数据库中的数据)、半结构化数据(如XML文件)和非结构化数据(...