Dolphinscheduler DAG中心源码分析
布景描绘
留意 : 在 Dolphinscheduler 中,离线使命是有完好的声明周期的,比如说中止、暂停、暂停康复、重跑等等,都是以DAG(有向无环图的方式进行使命安排)T+1离线使命的。
Dolphinscheduler DAG完成
org.apache.dolphinscheduler.common.graph.DAG
DAG三个重要的数据结构 :
// 极点信息
private final Map<Node, NodeInfo> nodesMap;
// 边相关信息,作用是记载极点和边的联系,能够找到叶子节点,也能够获取下流节点
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;
// 反向边相关信息,作用是能够快速找到入度为0的节点(开端节点),也能够获取上游节点
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
如下示例 :
DAG<String, String, String> graph = new DAG<>();
graph.addNode("A", "A");
graph.addNode("B", "B");
graph.addNode("C", "C");
// 增加一个B -> C的边,当时A还飘着呢
graph.addEdge("B", "C");
// 假如增加A -> B,其实便是会从B开端一直到子节点,看有没有可衔接的线到A,假如有,阐明这个A -> B的边增加不得,由于会构成环,不然就能够增加
graph.addEdge("A", "B");
源码剖析 :
org.apache.dolphinscheduler.common.graph.DAG#addEdge
public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
lock.writeLock().lock();
try {
// TODO 是否能够增加该边
if (!isLegalAddEdge(fromNode, toNode, createNode)) {
log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
return false;
}
// TODO 增加节点
addNodeIfAbsent(fromNode, null);
addNodeIfAbsent(toNode, null);
// TODO 增加边
addEdge(fromNode, toNode, edge, edgesMap);
addEdge(toNode, fromNode, edge, reverseEdgesMap);
return true;
} finally {
lock.writeLock().unlock();
}
}
private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
// TODO 假如fromNode和toNode两个是同一个极点,这个边是不能增加的
if (fromNode.equals(toNode)) {
log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
return false;
}
// TODO 这儿其实便是想说,不是创立节点,也便是说要求fromNode和toNode是需求存在的极点
if (!createNode) {
if (!containsNode(fromNode) || !containsNode(toNode)) {
log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
return false;
}
}
// Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the
// DAG has cycle!
// TODO 这儿获取节点的数量
int verticesCount = getNodesCount();
Queue<Node> queue = new LinkedList<>();
// TODO 将toNode放入到queue中
queue.add(toNode);
// if DAG doesn't find fromNode, it's not has cycle!
// TODO 当queue不为空,这儿必定就不为空了
while (!queue.isEmpty() && (--verticesCount > 0)) {
// TODO 获取行列里边的元素
Node key = queue.poll();
for (Node subsequentNode : getSubsequentNodes(key)) {
// TODO 其实这儿判别的是比如说A -> B 有衔接的DAG图,传入的是节点B,看B节点的边是不是有A,假如有A阐明已经有B -> A的相关了,
// TODO 就不能增加了。假如比如说B的下流节点,比如说 A -> B -> C,这样的话,B的下流节点便是C,C是需求放入queue中的
// TODO 中心思维其实便是要找到它要增加的方针节点的连线,是否有方针节点到源节点的连线存在(这样来判别是否存在环)
if (subsequentNode.equals(fromNode)) {
return false;
}
queue.add(subsequentNode);
}
}
return true;
}
Dolphinscheduler DagHelper说明
DAG类是一个根底通用的DAG东西类,而DagHelper是使命界说、使命界说直接的联系组装成DAG的一个事务东西类。
org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph
public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {
// TODO 这儿其实便是获取的流程实例对应的使命数和之间的联系
List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(
workflowInstance.getProcessDefinitionCode(),
workflowInstance.getProcessDefinitionVersion());
// TODO 获取对应的使命界说log
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);
// TODO 获取TaskNode
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
// generate process to get DAG info
// TODO 这儿其实解析的是是否自己手动指定的发动节点列表,默许不会
List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());
// TODO 假如 默许startNodeNameList为空
List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());
// TODO 构建ProcessDag目标实例
ProcessDag processDag = DagHelper.generateFlowDag(
taskNodeList,
startNodeNameList,
recoveryTaskNodeCodeList,
workflowInstance.getTaskDependType());
if (processDag == null) {
log.error("ProcessDag is null");
throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null");
}
// TODO 生成DAG
DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag);
log.debug("Build dag success, dag: {}", dagGraph);
// TODO 运用WorkflowGraph来封装使命节点列表和dagGraph
return new WorkflowGraph(taskNodeList, dagGraph);
}
org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag
public static ProcessDag generateFlowDag(
List<TaskNode> totalTaskNodeList,
List<Long> startNodeNameList,
List<Long> recoveryNodeCodeList,
TaskDependType depNodeType) throws Exception {
// TODO 其实便是拿到一切的节点
List<TaskNode> destTaskNodeList =
generateFlowNodeListByStartNode(
totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
}
// TODO 获取使命节点之前的联系
List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);
// TODO 其实便是实例化一个ProcessDag
ProcessDag processDag = new ProcessDag();
// TODO 设置DAG的边
processDag.setEdges(taskNodeRelations);
// TODO 设置DAG的极点
processDag.setNodes(destTaskNodeList);
return processDag;
}
设置了destTaskNodeList和taskNodeRelations
org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph
public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();
// TODO 增加极点
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getCode(), node);
}
}
// TODO 增加边
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}
return dag;
}
转载自 Journey
原文链接:https://segmentfault.com/a/1190000045117764
本文由 白鲸开源 供给发布支撑!