为什么要使用流程流程编排
问题原因
在我们日常开发中,当我们接到一个复杂业务需求时,大部分开发同学都是通过面向过程编程代码,瀑布式编写代码风格,当新增新的需求时我们需要修改当前的方法,当需求变更很频繁代码的维护成本和测试复杂度成指数级上升,当开发同学离职,下一个新来同学维护代码就很头痛😣,新的需求不知道如何下手。
解决办法
而我们可以通过流程编排来实现编写代码,流程编排就是将一段复杂的业务代码拆分成一个个节点/步骤然后聚合成一条执行链,实现了代码解耦,提高了代码复用性,也遵循了设计模式SOLID设计思想。
流程编排概念
概念
流程编排就是将接口或者一段复杂的业务代码拆分成一个个节点/步骤然后聚合成一条执行链,流程编排适合复杂的业务系统,列如电商交易系统。
名词
-
流程: 执行处理的整个过程(通常可以理解为一个接口,如下单接口)
-
节点/步骤:一个完整的流程可以分为多个节点,每个节点可以完成单一的业务行为,比如下单流程中的限流、限购、拆单等。一个节点通常是一个类或者spring bean。
-
上下文:将节点返回的数据设置到上下文context中,让后续节点能够获取到相关数据(如订单结算节点需要获取到初始化购物车节点中的信息)
生活中的案例
流程图
流程图
时序图
执行器
流程节点
代码是如何实现
流程上下文
功能:流程上下文是用来处理流程节点请求参数和返回参数传递
/**
* @Classname Context
* @Description 流程引擎上下文
* @Date 2023/7/20 17:53
* @Created by ZouLiPing
*/
public class Context<T> {
/**
* 请求参数上下文
*/
private RequestContext request = new RequestContext();
/**
* 返回参数上下文
*/
private ResponseContext response = new ResponseContext();
/**
* 流程标签 {@link com.zlp.process.enums.ProcessTypeEnum}
*/
private String processTag;
/**
* 业务类型
*/
private Integer billType;
public RequestContext getRequest() {
return request;
}
public <T> void createRequestBody(T body) {
request.setBody(body);
}
public ResponseContext getResponse() {
return response;
}
public <T> void createResponseBody(T body) {
response.setBody(body);
}
public void setProcessTag(String processTag) {
this.processTag = processTag;
}
public String getProcessTag() {
return processTag;
}
public Integer getBillType() {
return billType;
}
public void setBillType(Integer billType) {
this.billType = billType;
}
}
/**
* @Description 流程引擎请求参数上下文
* @Date 2023/7/20 17:53
* @Created by ZouLiPing
*/
public class RequestContext<T> {
private Map<String, Object> header = new ConcurrentHashMap<>();
private T body;
public T getBody() {
return body;
}
public void setBody(T body) {
this.body = body;
}
public Object getHeaderValueByKey(String key) {
return header.get(key);
}
public void setHeaderValueByKey(String key, Object value) {
if (header.containsKey(key)) {
header.replace(key, value);
} else {
header.put(key, value);
}
}
public void removeHeaderKey(String key)
{
if (header.containsKey(key)) {
header.remove(key);
}
}
}
/**
* @Description 流程引擎返回参数上下文
* @Date 2023/7/22 17:53
* @Created by ZouLiPing
*/
public class ResponseContext<T>{
private Map<String, Object> header = new ConcurrentHashMap<>();
public T getBody() {
return body;
}
public void setBody(T body) {
this.body = body;
}
private T body;
public Object getHeaderValueByKey(String key) {
return getHeader().get(key);
}
public void setHeaderValueByKey(String key, Object value) {
if (getHeader().containsKey(key)) {
getHeader().replace(key, value);
} else {
getHeader().put(key, value);
}
}
public Map<String, Object> getHeader() {
return header;
}
public void setHeader(Map<String, Object> header) {
this.header = header;
}
流程编排文件
功能:用来定义流程标签名称对应流程节点列表关系映射
[
{
"processTag":"ORDER_SECKILL_VIN_TAG",
"nodeList":[
"活动时间流程节点-PromoTimeValidNode",
"商品库存校验流程节点-GoodsStockValidNode",
"车主校验流程节点-VehicleOwnerValidNode",
"限购条件流程节点-LimitBuyValidNode",
"车龄验证流程节点-VehicleAgeValidNode",
"车型验证流程节点-VehicleTypeValidNode",
"订单业务组件流程节点-OrderServiceNode"
]
}
]
流程节点
功能:流程工厂主要的功能是将编排好节点文件按照顺序放到容器中
/**
* @Classname NodeComponent
* @Description 节点组件基类
* @Date 2023/7/20 22:13
* @Created by ZouLiPing2
*/
@Slf4j(topic = "NodeComponent" )
public abstract class NodeComponent {
/**
* 执行条件判断分支,可以根据类型分支判断
*
* @param context
* @return boolean
*/
public void condition(Context context){
log.info("执行条件语句判断");
}
/**
* 处理上下文
* @author ZouLiPing
* @param context
* @date 2023/7/20 22:15
*/
public abstract void process(Context context);
/**
* 获取节点名称
* 节点名称会放在node.js中,需要保证一一对应
*/
public abstract String getProcessNodeName();
@Override
public String toString() {
return getProcessNodeName();
}
}
/**
* @Description 车主校验流程节点
* @Date 2023/7/20 22:22
* @Created by ZouLiPing
*/
@Component
@RequiredArgsConstructor
@Slf4j(topic = "VehicleOwnerValidNode")
public class VehicleOwnerValidNode extends NodeComponent {
private final VehicleService vehicleService;
@Override
public boolean condition(Context context) {
if (Objects.equals(context.getBillType(), BillTypeEnum.BILL_TYPE_10001)) {
OrderReq orderReq = OrderContextUtils.getOrderReq(context);
Vehicle vehicle =vehicleService.getVehicleByModelCode(orderReq.getAssociateVin());
if (!Objects.equals(vehicle.getVin(),orderReq.getAssociateVin())){
throw new CustomException(ResultCode.PROCESS_ORDER_20003.getCode(), ResultCode.PROCESS_ORDER_20003.getMessage(), MethodUtil.getLineInfo());
}
return Boolean.TRUE;
}
return false;
}
@Override
public void process(Context context) {
OrderReq orderReq = OrderContextUtils.getOrderReq(context);
Vehicle vehicle =vehicleService.getVehicleByVin(orderReq.getAssociateVin());
if (!Objects.equals(vehicle.getVin(),orderReq.getAssociateVin())){
throw new CustomException(ResultCode.PROCESS_ORDER_20003.getCode(), ResultCode.PROCESS_ORDER_20003.getMessage(), MethodUtil.getLineInfo());
}
}
@Override
public String getProcessNodeName() {
return "车主校验流程节点-VehicleOwnerValidNode";
}
}
/**
* @Description 订单业务组件流程节点
* @Date 2023/7/20 22:22
* @Created by ZouLiPing
*/
@Component
@RequiredArgsConstructor
@Slf4j(topic = "OrderServiceNode")
public class OrderServiceNode extends NodeComponent {
private final OrderService orderService;
@Override
public boolean condition(Context context) {
return Boolean.FALSE;
}
@Override
public void process(Context context) {
OrderReq orderReq = OrderContextUtils.getOrderReq(context);
Order order = orderService.createOrder(orderReq);
OrderResp orderResp = OrderContextUtils.getOrderResp(context);
orderResp.setReqNo(String.valueOf(System.currentTimeMillis()));
orderResp.setStatus(StatusEnum.NORMAL.getValue());
orderResp.setOrderCode(order.getOrderCode());
orderResp.setPoint((int)(Math.random()*100));
OrderResp.SkuResp skuResp = new OrderResp.SkuResp();
skuResp.setSkuId(125);
skuResp.setSpuId(456);
skuResp.setQuantity(1);
skuResp.setPromoId(567);
skuResp.setStock(45);
skuResp.setLimit(1);
skuResp.setTotalLimit(3);
orderResp.setSkuRespList(Arrays.asList(skuResp));
context.createResponseBody(orderResp);
}
@Override
public String getProcessNodeName() {
return "订单业务组件流程节点-OrderServiceNode";
}
}
流程执行工厂
功能:流程工厂主要的功能是将编排好节点文件按照顺序放到容器中,在项目启动中通过ApplicationContextAware注入到容器中,方便通过流程引擎调用。
/**
* 流程编排工厂
*
* @date: 2023/7/21 21:00
*/
@Component
@Slf4j(topic = "ProcessNodesFactory")
public class ProcessNodesFactory implements ApplicationContextAware {
/**
* 存储的结构如下:
* 流程标签名称->流程节点名称 列表映射关系
*/
private ListMultimap<String, NodeComponent> processNodes = LinkedListMultimap.create(12);
/**
* 存储的结构如下:
* 节点名称->节点Bean 列表映射关系
*/
private ConcurrentHashMap<String, NodeComponent> nodeName = new ConcurrentHashMap<>(12);
@Value("classpath:process/node.json")
private Resource nodesJson;
/**
* 通过流程标签名称获取流程节点实例
*
* @Date: 2023/7/21 21:00
*/
public List<NodeComponent> getNodeList(String processTag) {
return processNodes.get(processTag);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
log.info("[节点工厂]->开始加载流程节点编排");
Map<String, NodeComponent> beansOfType = applicationContext.getBeansOfType(NodeComponent.class);
beansOfType.values().forEach(node -> {
String processNodeName = node.getProcessNodeName();
if (nodeName.containsKey(processNodeName)) {
throw new CustomException(ResultCode.PROCESS_ORDER_20001.getCode(),
String.format(ResultCode.PROCESS_ORDER_20001.getMessage(), processNodeName), MethodUtil.getLineInfo());
}
nodeName.put(processNodeName, node);
});
// 解析节点编排文件
String jsonNodeData;
try {
File file = nodesJson.getFile();
jsonNodeData = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
} catch (IOException e) {
log.error("读取节点流程失败:error={}", e);
throw new CustomException(ResultCode.PROCESS_ORDER_20003.getCode(), ResultCode.PROCESS_ORDER_20003.getMessage(), MethodUtil.getLineInfo());
}
List<ProcessTagNodes> processTagNodes = JSON.parseArray(jsonNodeData, ProcessTagNodes.class);
for (ProcessTagNodes processNode : processTagNodes) {
for (String node : processNode.nodeList) {
NodeComponent nodeComponent = nodeName.get(node);
if (Objects.isNull(nodeComponent)) {
throw new CustomException(ResultCode.PROCESS_ORDER_20004.getCode(),
String.format(ResultCode.PROCESS_ORDER_20004.getMessage(), node), MethodUtil.getLineInfo());
}
processNodes.put(processNode.processTag, nodeComponent);
}
}
log.info("[节点工厂] 加载完毕,所有的编排为={}", JSON.toJSONString(processNodes.asMap()));
}
/**
* 流程节点名称对象
*/
@Data
public static class ProcessTagNodes {
/**
* 流程标签名称
*/
private String processTag;
/**
* 流程节点列表
*/
private List<String> nodeList;
}
}
流程引擎
功能:流程引擎主要的功能是将上下文参数交给流程引擎处理流程节点,并返回上下文
/**
* 流程编排工厂
*
* @date: 2023/7/21 21:00
*/
@Component
@Slf4j(topic = "ProcessNodesFactory")
public class ProcessNodesFactory implements ApplicationContextAware {
/**
* 存储的结构如下:
* 流程标签名称->流程节点名称 列表映射关系
*/
private ListMultimap<String, NodeComponent> processNodes = LinkedListMultimap.create(12);
/**
* 存储的结构如下:
* 节点名称->节点Bean 列表映射关系
*/
private ConcurrentHashMap<String, NodeComponent> nodeComponents = new ConcurrentHashMap<>(12);
@Value("classpath:process/node.json")
private Resource nodesJson;
/**
* 通过流程标签名称获取流程节点实例
*
* @Date: 2023/7/21 21:00
*/
public List<NodeComponent> getNodeList(String processTag) {
return processNodes.get(processTag);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
log.info("[节点工厂]->开始加载流程节点编排");
Map<String, NodeComponent> beansOfType = applicationContext.getBeansOfType(NodeComponent.class);
beansOfType.values().forEach(node -> {
String processNodeName = node.getProcessNodeName();
if (nodeComponents.containsKey(processNodeName)) {
throw new CustomException(ResultCode.PROCESS_ORDER_20001.getCode(),
String.format(ResultCode.PROCESS_ORDER_20001.getMessage(), processNodeName), MethodUtil.getLineInfo());
}
nodeComponents.put(processNodeName, node);
});
// 解析节点编排文件
String jsonNodeData;
try {
File file = nodesJson.getFile();
jsonNodeData = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
} catch (IOException e) {
log.error("读取节点流程失败:error={}", e);
throw new CustomException(ResultCode.PROCESS_ORDER_20003.getCode(), ResultCode.PROCESS_ORDER_20003.getMessage(), MethodUtil.getLineInfo());
}
List<ProcessTagNodes> processTagNodes = JSON.parseArray(jsonNodeData, ProcessTagNodes.class);
for (ProcessTagNodes processNode : processTagNodes) {
for (String node : processNode.nodeList) {
NodeComponent nodeComponent = nodeComponents.get(node);
if (Objects.isNull(nodeComponent)) {
throw new CustomException(ResultCode.PROCESS_ORDER_20004.getCode(),
String.format(ResultCode.PROCESS_ORDER_20004.getMessage(), node), MethodUtil.getLineInfo());
}
processNodes.put(processNode.processTag, nodeComponent);
}
}
log.info("[节点工厂] 加载完毕,所有的编排为={}", JSON.toJSONString(processNodes.asMap()));
}
/**
* 流程节点名称对象
*/
@Data
public static class ProcessTagNodes {
/**
* 流程标签名称
*/
private String processTag;
/**
* 流程节点列表
*/
private List<String> nodeList;
}
}
流程实例定义和执行
功能:通过模板方案定义执行器的骨架,把公共的校验和执行流程在基类执行,子类可以单独实现自己的方法
/**
* 流程执行器接口
*/
public interface ExecutorService {
/**
* @param context
*
* 业务方案参数校验
*/
void verification(Context context);
/**
* @param context
*
* 执行业务
*/
ResponseContext execute(Context context);
/**
* @param context
* 服务结束(返回参数)
*/
void stop(Context context);
}
/**
* @author :ZouLiPing
* @Date :2023年7月22日15:21:47
* @description:流程实例基础执行类
*/
@Slf4j(topic = "AbstractProcessExecutorService")
public abstract class AbstractProcessExecutorService implements ExecutorService {
@Resource
private ProcessEngin processEngin;
@Resource
protected ApplicationContext applicationContext;
/**
* AbstractProcessExecutorService
*/
protected AbstractProcessExecutorService proxy;
@PostConstruct
public void init() {
log.info("init 从Spring上下文中获取AOP代理对象");
//从Spring上下文中获取AOP代理对象
proxy = applicationContext.getBean(this.getClass());
}
@Override
public void verification(Context context) {
log.info("校验参数start processTag={}",context.getProcessTag());
String processCode = context.getProcessTag();
Assert.isFalse(Objects.isNull(processEngin),"[流程引擎]上下文对象为空!");
Assert.isFalse(Objects.isNull(processCode),"[流程引擎]节点名称为空!");
ProcessTypeEnum.checkProcessTag(processCode);
}
@Override
public ResponseContext execute(Context context){
log.info("execute 流程实例执行请求参数request={}", JSON.toJSONString(context.getRequest().getBody()));
try {
proxy.verification(context);
processEngin.process(context.getProcessTag(), context);
}catch (CustomException e){
log.error("流程实例执行自定义异常",e);
throw new CustomException(e.getCode(),e.getMessage(),e.getMethod());
}catch (Exception e){
log.error("流程实例执行异常",e);
throw new CustomException(ResultCode.FAILED.getCode(),e.getMessage(), MethodUtil.getLineInfo());
}
proxy.stop(context);
ResponseContext response = context.getResponse();
log.info("execute 流程实例执行结束返回参数 response={}", JSON.toJSONString(response.getBody()));
return response;
}
}
/**
* 流程节点执行
*
* @Date 2023/7/20 22:13
* @Created by ZouLiPing
*/
@Component
@Slf4j(topic = "ProcessNodeExecute")
public class OrderProcessExecute extends AbstractProcessExecutorService implements ExecutorService {
@Override
public void verification(Context serviceContext) {
super.verification(serviceContext);
}
@Override
public ResponseContext execute(Context context) {
return super.execute(context);
}
@Override
public void stop(Context context) {
OrderResp orderResp = OrderContextUtils.getOrderResp(context);
orderResp.setAddress("上海市浦东新区航头镇123456789");
}
}
流程测试
/**
* 流程引擎测试
* @Date 2022/9/21 12:57
* @Created by ZouLiPing
*/
@RestController
@RequestMapping("/process")
@RequiredArgsConstructor
@Api(value = "process", tags = "流程引擎测试")
public class ProcessController {
private final OrderProcessExecute processNodeExecute;
@ApiOperation( "订单流程引擎测试")
@PostMapping(value = "orderProcessTest")
public Result<OrderResp> orderProcessTest(@Valid @RequestBody OrderReq orderReq){
Context context = new Context();
context.setProcessTag(orderReq.getProcessTag());
context.setBillType(orderReq.getBillType());
context.createRequestBody(orderReq);
ResponseContext responseContext = processNodeExecute.execute(context);
return Result.success(OrderContextUtils.getOrderResp(responseContext));
}
}
接口地址:/basic/process/orderProcessTest
请求方式:POST
请求示例:
{
"associateDealer": "",
"associateVin": "",
"billType": 0,
"processTag": "",
"receiverAddressId": 0,
"skuReqList": [
{
"promoId": "",
"quantity": 0,
"skuId": 0,
"spuId": 0
}
],
"userId": 0
}
请求参数:
参数名称 | 参数说明 | in | 是否必须 | 数据类型 | schema |
---|---|---|---|---|---|
orderReq | orderReq | body | true | 订单请求参数 | 订单请求参数 |
associateDealer | 经销商名称 | false | string | ||
associateVin | 车牌号码 | false | string | ||
billType | 业务类型 | false | integer(int32) | ||
processTag | 流程标签名称为空 | true | string | ||
receiverAddressId | 收货地址信息 | false | integer(int32) | ||
skuReqList | sku信息为空 | true | array | SKU请求信息 | |
promoId | 活动ID | false | string | ||
quantity | 数量 | false | integer(int32) | ||
skuId | skuId | false | integer(int32) | ||
spuId | spuId | false | integer(int32) | ||
userId | 用户ID | false | integer(int32) |
响应参数:
参数名称 | 参数说明 | 类型 | schema |
---|---|---|---|
code | 错误码 | integer(int64) | integer(int64) |
data | 响应数据 | 订单返回参数 | 订单返回参数 |
orderCode | 订单号 | string | |
point | 用户V值 | integer(int32) | |
processTag | 流程标签名称为空 | string | |
reqNo | 请求唯一编号 | string | |
skuRespList | SKU返回信息 | array | SKU返回信息 |
limit | 限购数量 | integer(int32) | |
promoId | 活动ID | integer(int32) | |
quantity | 数量 | integer(int32) | |
skuId | skuId | integer(int32) | |
spuId | spuId | integer(int32) | |
stock | 库存 | integer(int32) | |
totalLimit | 秒杀商品总库存 | integer(int32) | |
status | 状态 | integer(int32) | |
message | 提示信息 | string |
总结
优点
- 实现代码复用性(解决代码修改错、漏、不一致问题)。一个公共节点可以在多套流程中复用 ;
- 符合设计模式单一职责,开闭原则思想。一个节点只需要聚焦某一块业务上,如果产品需要在新增一个需求,只需要在新增一个节点,不需要再原先节点上进行修改;
- 方便理解业务。通过json格式配置配上每个节点的注释很好的能够帮助新同学理解该流程实现的过程 ;
- 每个流程节点可以有单独测试用例;
缺点
- 增加代码编写和理解的难度(毕竟新增一个节点需要新起一个类);
- 流程节点密度拆分;
小结
流程编排和责任链的区别
为什么说编程编排是“非典型”责任链模式
流程编排的SOLID
- S (单一职责原则):每个“珠子”职责清晰
- O (开闭原则):业务逻辑新增则新增“珠子”
- D (依赖倒置原则):流程引擎执行的是抽象的“珠子接口”,具体“珠子”是使用时注入
避免流程化的陷进
系统流程和业务流程有什么区别
**系统流程:**系统流程指的是强调的是一个线程一口气执行完的过程,用于描绘系统物理模型,表达数据在系统各个部件之间(程序、文件、数据库、人工过程等)流动的情况。
**业务流程:**业务流程是指一个业务行为的过程,比如请假的流程需要先提交申请,再逐级审批,最后请假成功这样的过程。
代码链接地址:
改变/basic-project
分支:pipeline-0722
参考链接
流程编排在电商系统中应用_uhana的博客-CSDN博客
【学架构也可以很有趣】【“趣”学架构】- 3.搭完架子串珠子_哔哩哔哩_bilibili
系统流程图和业务流程图的区别是什么?