上一篇文中我们实现了顺序的工作流,对于多分支的工作流如下,该如何实现呢?
小明提交了一个申请单,然后经过经理审批,如果通过,发邮件通知,不通过,则打回重新填写申请单,直到通过为止
1、分析
- 需要引入一种分支节点,可以进行简单的二选一流转
- 节点的入边、出边不只一条
- 需要一种逻辑表达式语义,可以配置分支节点
2、设计
- 节点要支持多入边、多出边
- 节点算子来决定从哪个出边出
- 使用一种简单的规则引擎,支持简单的逻辑表达式的解析
- 简单分支节点的XML定义
3、实现
XML定义如下:
<definitions>
<process id="process_2" name="简单审批例子">
<startEvent id="startEvent_1">
<outgoing>flow_1</outgoing>
</startEvent>
<sequenceFlow id="flow_1" sourceRef="startEvent_1" targetRef="approvalApply_1"/>
<approvalApply id="approvalApply_1" name="提交申请单">
<incoming>flow_1</incoming>
<incoming>flow_5</incoming>
<outgoing>flow_2</outgoing>
</approvalApply>
<sequenceFlow id="flow_2" sourceRef="approvalApply_1" targetRef="approval_1"/>
<approval id="approval_1" name="审批">
<incoming>flow_2</incoming>
<outgoing>flow_3</outgoing>
</approval>
<sequenceFlow id="flow_3" sourceRef="approval_1" targetRef="simpleGateway_1"/>
<simpleGateway id="simpleGateway_1" name="简单是非判断">
<trueOutGoing>flow_4</trueOutGoing>
<expr>approvalResult</expr>
<incoming>flow_3</incoming>
<outgoing>flow_4</outgoing>
<outgoing>flow_5</outgoing>
</simpleGateway>
<sequenceFlow id="flow_5" sourceRef="simpleGateway_1" targetRef="approvalApply_1"/>
<sequenceFlow id="flow_4" sourceRef="simpleGateway_1" targetRef="notify_1"/>
<notify id="notify_1" name="结果邮件通知">
<incoming>flow_4</incoming>
<outgoing>flow_6</outgoing>
</notify>
<sequenceFlow id="flow_6" sourceRef="notify_1" targetRef="endEvent_1"/>
<endEvent id="endEvent_1">
<incoming>flow_6</incoming>
</endEvent>
</process>
</definitions>
加入了simpleGateway这个简单分支节点,用于表示简单的二选一分支,当expr中的表达式为真时,走trueOutGoing中的出边,否则走另一个出边。
节点支持多入边、多出边,修改后的PeNode如下:
import org.w3c.dom.Node;
import java.util.ArrayList;
import java.util.List;
public class PeNode {
public String id;
public String type;
public List<PeEdge> in = new ArrayList<>();
public List<PeEdge> out = new ArrayList<>();
public Node xmlNode;
public PeNode(String id) {
this.id = id;
}
public PeEdge onlyOneOut() {
return out.get(0);
}
public PeEdge outWithID(String nextPeEdgeID) {
return out.stream().filter(e -> e.id.equals(nextPeEdgeID)).findFirst().get();
}
public PeEdge outWithOutID(String nextPeEdgeID) {
return out.stream().filter(e -> !e.id.equals(nextPeEdgeID)).findFirst().get();
}
}
以前只有一个出边时,是由当前节点来决定下一节点的,现在多出边了,该由边来决定下一个节点是什么,修改后的流程引擎代码如下:
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class XmlPeProcessBuilder {
private String xmlStr;
private final Map<String, PeNode> id2PeNode = new HashMap<>();
private final Map<String, PeEdge> id2PeEdge = new HashMap<>();
public XmlPeProcessBuilder(String xmlStr) {
this.xmlStr = xmlStr;
}
public PeProcess build() throws Exception {
//strToNode : 把一段xml转换为org.w3c.dom.Node
Node definations = XmlUtil.strToNode(xmlStr);
//childByName : 找到definations子节点中nodeName为process的那个Node
Node process = XmlUtil.childByName(definations, "process");
NodeList childNodes = process.getChildNodes();
for (int j = 0; j < childNodes.getLength(); j++) {
Node node = childNodes.item(j);
//#text node should be skip
if (node.getNodeType() == Node.TEXT_NODE) continue;
if ("sequenceFlow".equals(node.getNodeName()))
buildPeEdge(node);
else
buildPeNode(node);
}
Map.Entry<String, PeNode> startEventEntry = id2PeNode.entrySet().stream().filter(entry -> "startEvent".equals(entry.getValue().type)).findFirst().get();
return new PeProcess(startEventEntry.getKey(), startEventEntry.getValue());
}
private void buildPeEdge(Node node) {
//attributeValue : 找到node节点上属性为id的值
PeEdge peEdge = id2PeEdge.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeEdge(id));
peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id));
peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id));
}
private void buildPeNode(Node node) {
PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id));
peNode.type = node.getNodeName();
peNode.xmlNode = node;
List<Node> inPeEdgeNodes = XmlUtil.childsByName(node, "incoming");
inPeEdgeNodes.stream().forEach(n -> peNode.in.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id))));
List<Node> outPeEdgeNodes = XmlUtil.childsByName(node, "outgoing");
outPeEdgeNodes.stream().forEach(n -> peNode.out.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id))));
}
}
新加入的simpleGateway节点算子如下:
/**
* 简单是非判断
*/
public class OperatorOfSimpleGateway implements IOperator {
@Override
public String getType() {
return "simpleGateway";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
ScriptEngineManager manager = new ScriptEngineManager();
ScriptEngine engine = manager.getEngineByName("js");
engine.put("approvalResult", peContext.getValue("approvalResult"));
String expression = XmlUtil.childTextByName(node.xmlNode, "expr");
String trueOutGoingEdgeID = XmlUtil.childTextByName(node.xmlNode, "trueOutGoing");
PeEdge outPeEdge = null;
try {
outPeEdge = (Boolean) engine.eval(expression) ?
node.outWithID(trueOutGoingEdgeID) : node.outWithOutID(trueOutGoingEdgeID);
} catch (ScriptException e) {
e.printStackTrace();
}
processEngine.nodeFinished(outPeEdge);
}
}
简单使用了js脚本作为表达式
其他算子相比V1也有相应变化
OperatorOfApproval.java
public class OperatorOfApproval implements IOperator {
@Override
public String getType() {
return "approval";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
peContext.putValue("approver", "经理");
Integer price = (Integer) peContext.getValue("price");
//价格<=200审批才通过,即:approvalResult=true
boolean approvalResult = price <= 200;
peContext.putValue("approvalResult", approvalResult);
System.out.println("approvalResult : " + approvalResult + ",price : " + price);
processEngine.nodeFinished(node.onlyOneOut());
}
}
OperatorOfApprovalApply.java
public class OperatorOfApprovalApply implements IOperator {
public static int price = 500;
@Override
public String getType() {
return "approvalApply";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
//price每次减100
peContext.putValue("price", price -= 100);
peContext.putValue("applicant", "小张");
processEngine.nodeFinished(node.onlyOneOut());
}
}
OperatorOfNotify.java
/**
* 结果邮件通知
*/
public class OperatorOfNotify implements IOperator {
@Override
public String getType() {
return "notify";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
System.out.println(String.format("%s 提交的申请单 %s 被 %s 审批,结果为 %s",
peContext.getValue("applicant"),
peContext.getValue("price"),
peContext.getValue("approver"),
peContext.getValue("approvalResult")));
processEngine.nodeFinished(node.onlyOneOut());
}
}
测试类
import org.junit.Test;
public class ProcessEngineTest {
@Test
public void testRun() throws Exception {
//读取文件内容到字符串
String modelStr = DomUtils.XmlToString("E:\\gitee\\springboot-demo\\src\\main\\java\\com\\example\\demo\\enginer\\xml\\v2.xml");
ProcessEngine processEngine = new ProcessEngine(modelStr);
processEngine.registNodeProcessor(new OperatorOfApproval());
processEngine.registNodeProcessor(new OperatorOfApprovalApply());
processEngine.registNodeProcessor(new OperatorOfNotify());
processEngine.registNodeProcessor(new OperatorOfSimpleGateway());
processEngine.start();
Thread.sleep(1000 * 1);
}
}
运行一下,输出结果如下:
approvalResult : false,price : 400
approvalResult : false,price : 300
approvalResult : true,price : 200
小明 提交的申请单 200 被 经理 审批,结果为 true
process finished!
4、总结
至此,我们实现了顺序和分支语义的工作流,整个引擎的结构如下:
通过此图我们可知,这里有一个相对稳定的引擎层,同时为了提供扩展性,提供了一个节点算子层,所有的节点算子的新增都在此处中。
此外,进行了一定程度的控制反转,即:由算子决定下一步走哪里,而不是引擎。这样,极大地提高了引擎的灵活性,更好的进行了封装。
最后,使用了上下文,提供了一种全局变量的机制,便于节点之间的数据流动。
当然,以上的三个迭代距离实际的线上应用场景相距甚远,还需实现与展望以下几点才可,如下:
- 前端的画布、前后端流程数据结构定义及转换
- 应把节点抽象成一个函数,要有入参、出参,数据类型等
- 流程图的动态修改,即:可以在流程开始后,对流程图进行修改
- 流程图的语义合法性检查,xsd、自定义检查技术等
- 流程的取消、重置、变量传入等
- 流程的流程历史记录,及回滚到任意节点
- 防止重启后流转信息丢失,需要持久化机制的加入
- 一些异常情况的考虑与设计
- 并发修改情况下的考虑
- 更合适的规则引擎及多种规则引擎的实现、配置
- 关键的地方加入埋点,用以控制引擎或吐出事件