1、camunda简介
1、介绍:
Camunda是一种工作流引擎。
Camunda BPM 是一个轻量级、开源灵活的工作流框架,是由Java开发的一个纯Java库。它的核心是一个在Java虚拟机内部运行的原生BPMN 2.0流程引擎,因此它可以嵌入到任何Java应用程序或运行时容器中。
官方文档:https://docs.camunda.org
源码地址:https://github.com/camunda
2、原理
动态探针:动态探针是应用程序没有定义,在程序运行时动态添加的探针。动态探针类似于异常处理机制,当系统产生一个异常,就会跳转去执行对应的 handle。
动态探针会在函数入口和出口插入一些断点,程序执行到断点时候会去执行对应的 handle,从而达到观测应用程序的目的。这里的中断是指 trap(陷阱),在X86体系是int3指令。
Camunda采用动态探针技术实现的动态追踪。用于监视生产系统中的事件。记录特定事件,跟踪问题等。
2、快速构建camunda demo项目
地址:https://start.camunda.com/
上面可选的版本比较新,如果项目中springBoot版本比较老可以参照:camunda版本和springBoot版本对照
下载之后自行把数据库连接信息改成mysql即可,然后第一次启动会生成camunda相关的表(49个)如下:
ACT_RE_:RE代表存repository。带有此前缀的表包含“静态”信息,例如流程定义和流程资源(图像、规则等)。
ACT_RU_:RU代表runtime。这些是运行时表,包含流程实例、用户任务、变量、作业等的运行时数据。引擎仅在流程实例执行期间存储运行时数据,并在流程实例结束时删除记录。这使运行时表既小又快。
ACT_ID_:ID代表identity。这些表包含身份信息,例如用户、组等。
ACT_HI_:HI代表history。这些是包含历史数据的表,例如过去的流程实例、变量、任务等。
ACT_GE_*:GE代表 general一般数据,用于各种用例。
3、camunda模型BPMN、DMN、FROM介绍
camundaBPM和modeler历史版本下载:https://downloads.camunda.cloud/release/
camunda官方文档介绍:https://docs.camunda.org/manual/7.16/reference/bpmn20/
camunda模型BPMN、DMN、FROM介绍(中文)。
**流程图样例:**
流程的开启方式:
1、在camunda管理系统中点startProcess开启。
2、用postMan调开启流程接口(post请求)。
3、自定义接口,然后在接口中调用RuntimeService的开启流程方法。
4、Servcie Task - 调用外部服务
4.1 调用Java代码 - 实现JavaDelegate
Java Class - 指定代理类全路径(实现接口JavaDelegate、ActivityBehavior,抛异常则回滚到上一步),如:com.example.workflow.delegate.MyDelegate
Delegate Expression - 使用表达式来提取代理对象(实现接口JavaDelegate、ActivityBehavior),更适合SpringBoot环境,如:
m
y
D
e
l
e
g
a
t
e
B
e
a
n
E
x
p
r
e
s
s
i
o
n
−
调
用
方
法
表
达
式
,
如
:
{myDelegateBean} Expression - 调用方法表达式,如:
myDelegateBeanExpression−调用方法表达式,如:{myBean.doWork()}
Expression - 解析值表达式(Evaluating a value expression)
4.2 External (External Task)
指定task对应的topic
由外部Worker通过REST API(Long Polling)主动获取并锁定(可指定超时时间)任务(亦支持Java API)
外部worker通知流程 完成Complete 或 失败Failure
SpringBoot ExternalTaskHandler示例代码:
@ExternalTaskSubscription(topicName = "payment-biz")
@Bean
public ExternalTaskHandler paymentBizHandler() {
return (externalTask, externalTaskService) -> {
//获取流程变量
String productName = externalTask.getVariable(PRODUCT_NAME);
Double productPrice = externalTask.getVariable(PRODUCT_PRICE);
String paymentAssignee = externalTask.getVariable(PAYMENT_ASSIGNEE);
Double productDiscountPrice = externalTask.getVariable(PRODUCT_DISCOUNT_PRICE);
log.info("The External Task {} has been checked!", externalTask.getId());
//完成任务
externalTaskService.complete(externalTask);
//完成任务且添加流程变量
//externalTaskService.complete(externalTask, Variables.putValueTyped("creditScores", creditScoresObject));
log.info("Payment SUCCESS - paymentAssignee={}, productName={}, productPrice={}, productDiscountPrice={}",
paymentAssignee,
productName,
productPrice,
productDiscountPrice);
//任务失败
//externalTaskService.handleFailure(
// externalTask,
// "errorMsg",
// "errorDetails",
// 1,
// 10L * 60L * 1000L);
};
}
4.3 调用web服务(REST、SOAP)
关于使用connector的细节可参见:
https://docs.camunda.org/manual/latest/user-guide/process-engine/connectors
https://github.com/camunda/camunda-bpm-examples/tree/master/servicetask
5、Business Rule Task - 业务规则任务(可绑定DMN)
如下图可通过Business Rule Task绑定单独的DMN文件中的Decision决策表格,
以此实现根据不同输入产生不同输出的效果,
DMN输出结果需通过Result Variable等设置进行映射,
映射之后才可供后续流程元素进行访问。
例如如上图中的Decision Ref中product-discount-rule即对应DMN模型中的Desition.id。
如下即为具体决策表格内容
6、会签流程介绍。
会签: 在流程业务管理中,任务是通常都是由一个人去处理的,而多个人同时处理一个任务,这种任务我们称之为会签任务。
6.1、按照会签执行的顺序分为:
a)串行会签:串行会签也叫顺序会签,指按照提交流程处理人的次序user1、user2、user3依次接收待办任务,并按顺序处理流程。
b) 并行会签:指user1、user2、user3同时接收到流程待办任务,并行处理。
6.2、按照会签通过的比例分为:
a) 全部通过:会签人全部审批通过表决后,会签通过。
b) 按数量通过:达到一定数量的通过表决后,会签通过。
c) 按比例通过:达到一定比例的通过表决后,会签通过。
d) 一票通过:只要有一个表决通过的,会签通过。
e) 一票否决:只要有一个表决时否定的,会签通过。
6.3 会签参数设置
主要参数配置说明:
1、 loop cardinality:循环基数。可选项。可以直接填整数,表示会签的人数。
2、 Collection:集合。可选项。会签人数的集合,通常为list,和loop cardinality二选一。
3、 Element variable:元素变量。选择Collection时必选,为collection集合每次遍历的元素。
4、 Completion condition:完成条件。可选。比如设置一个人完成后会签结束,那么其他人的代办任务都会消失。
6.4 配置会签流程处理人
需要注意的是,右侧的Assignee,Candidate Users,Candidate Groups,分别表示按照负责人/候选用户/候选组。
采用Assignee,填写上一步中的Element Variable字段的内容,即可获取当前审批人,注意是动态变量,${}格式,即会签人Collection中遍历的每一个人赋值给该变量。
6.5 其它说明
1、为并行节点设置 Completion Condition 条件为 ${nrOfCompletedInstances == 1},是实现或签的方法。
2、参考上述代码,设置变量 .setVariable(“assignee”,“新加签的用户”),可以为并行会签节点进行加签。
3、为并行节点设置 Completion Condition 属性,条件设置为 ${nrOfPassInstances / nrOfInstances > 0.5},是实现会签比例大于 50% 自动通过的方法(其中 nrOfPassInstances 不是 camunda 内置变量,是通过 complete 监听器自己维护的一个变量,节点每个实例完成后如果是同意则 nrOfPassInstances 加1)。
7、camunda核心对象ProcessEngine
7.1 ProcessEngine初始化
ProcessEngine是Camunda流程引擎的核心。我们在流程中的很多具体的处理比如流程部署、流程部署、流程审批等操作都是通过XXXService来处理的。而相关的XXXService都是通过ProcessEngine来管理的。
在SpringBoot项目会根据我们导入的依赖完成自动装配,从而完成ProcessEngine的自动注入。
7.2 从ProcessEngine中,你可以获得包含工作流/BPM方法的各种服务
服务名称 | 介绍 |
---|---|
运行时服务-RuntimeService | 首先可以通过一个流程定义启动多个流程实例。也能用于处理检索和存储流程变量的服务 |
过滤器服务-FilterService | 允许创建和管理过滤器。过滤器是像任务查询一样的存储查询。例如,过滤器被任务列表用来过滤用户任务 |
身份服务-IdentityService | 是非常简单的。它允许对组和用户进行管理(创建、更新、删除、查询…)。重要的是要理解,核心引擎实际上在运行时并不对用户进行任何检查 |
表单服务-FormService | 一个可选的服务。提供了表单功能 |
管理服务-ManagementService | 它允许检索关于数据库表和表元数据的信息。此外,它暴露了查询功能和Job的管理操作。Job在引擎中被用于各种事情,如定时器、异步延续、延迟暂停/激活等。 |
案例服务-CaseService | 与运行时服务(RuntimeService)类似,但用于案例实例。它处理启动案例定义的新案例实例并管理案例执行的生命周期。该服务也被用来检索和更新案例实例的流程变量。 |
外部任务服务-ExternalTaskService | 提供对外部任务实例的访问。外部任务代表在外部处理的工作项目,独立于流程引擎。 |
历史服务-HistoryService | 暴露了引擎收集的所有历史数据。当执行流程时,引擎可以保留很多数据(这是可配置的),如流程实例的开始时间、谁做了哪些任务、完成任务花了多长时间、每个流程实例遵循的路径等。该服务主要暴露了访问这些数据的查询功能。 |
决策服务-DecisionService | 允许评估部署在引擎中的决策。它是评估独立于流程定义的业务规则任务中的决策的一种选择。 |
任务服务-TaskService | 围绕用户审批操作的一切都会被归纳到TaskService。比如:查询分配给用户或组的任务,创建新的独立任务。这些是与流程实例无关的任务,操纵一个任务被分配给哪个用户,或者哪个用户以某种方式参与到任务中,声称并完成一项任务。声称意味着有人决定成为该任务的受让人,意味着这个用户将完成该任务。完成意味着 “完成任务的工作”等 |
仓库服务RepositoryService | 提供了管理和操纵部署和流程定义的操作 |
7.3 RuntimeService介绍
常用操作:
可根据不同API启动实例,并返回ProcessInstance对象;
可异步删除所有实例,并返回Batch对象;
可获取变量信息,并返回VariableMap对象;
可获取变量信息,并返回Map<String, Object>对象;
可获取单个变量信息,并返回TypedValue;
可查询执行实例,并返回ExecutionQuery对象;
可自定义SQL查询并执行实例,并返回NativeExecutionQuery对象;
可查询流程实例信息,并返回ProcessInstanceQuery对象;
可自定义SQL查询流程实例信息,并返回NativeProcessInstanceQuery对象;
可查询Incident相关信息,并返回Incident对象;
开启流程示例:
从任意节点启动实例:
@Test
public void createProcessInstanceByKey() {
String processInstanceByKey = "leave";
String activityId = "Activity_1r8r4jn";
ProcessInstantiationBuilder processInstantiationBuilder = runtimeService.createProcessInstanceByKey(processInstanceByKey);
ProcessInstance processInstance = processInstantiationBuilder.businessKey("001")
//某节点之前开始
.startBeforeActivity(activityId)
.execute();
System.out.println(processInstance.getId() + "," + processInstance.getBusinessKey() + "," + processInstance.getProcessDefinitionId());
}
@Test
public void createProcessInstanceByKey2() {
String processInstanceByKey = "leave";
String activityId = "Flow_1c279og";
ProcessInstantiationBuilder processInstantiationBuilder = runtimeService.createProcessInstanceByKey(processInstanceByKey);
ProcessInstance processInstance = processInstantiationBuilder.businessKey("001")
//从连线开始触发
.startTransition(activityId)
.execute();
System.out.println(processInstance.getId() + "," + processInstance.getBusinessKey() + "," + processInstance.getProcessDefinitionId());
}
@Test
public void createProcessInstanceByKey3() {
String processInstanceByKey = "leave";
String activityId = "Activity_0zafxq7";
ProcessInstantiationBuilder processInstantiationBuilder = runtimeService.createProcessInstanceByKey(processInstanceByKey);
ProcessInstance processInstance = processInstantiationBuilder.businessKey("001")
//某节点之后
.startAfterActivity(activityId)
.execute();
System.out.println(processInstance.getId() + "," + processInstance.getBusinessKey() + "," + processInstance.getProcessDefinitionId());
}
@Test
public void createProcessInstanceByKey4() {
String processInstanceByKey = "leave";
String activityId = "Activity_0zafxq7";
//跳过监听器
boolean skipCustomListeners = true;
boolean skipIoMappings = true;
ProcessInstantiationBuilder processInstantiationBuilder = runtimeService.createProcessInstanceByKey(processInstanceByKey);
ProcessInstance processInstance = processInstantiationBuilder.businessKey("001")
.startAfterActivity(activityId)
.execute(skipCustomListeners, skipIoMappings);
System.out.println(processInstance.getId() + "," + processInstance.getBusinessKey() + "," + processInstance.getProcessDefinitionId());
}
当流程实例启动时,以下两张表会有新的数据插入:
ACT_RU_EXECUTION (正在执行对象表)
ACT_RU_TASK (正在执行任务表)
同时会有几张历史表也会插入数据(ACT_HI_XXXX都是历史表):
ACT_HI_ACTINST
ACT_HI_TASKINST
ACT_HI_PROCINST (历史流程实例表与运行实例一对一)
ACT_HI_IDENTITYLINK
**驳回到某个节点示例:**
ActivityInstance tree = runtimeService.getActivityInstance(processInstanceId);
List<HistoricActivityInstance> resultList = historyService
.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.activityType("userTask")
.finished()
.orderByHistoricActivityInstanceEndTime()
.asc()
.list();
//得到任务节点id
List<HistoricActivityInstance> historicActivityInstanceList = resultList.stream().filter(historicActivityInstance -> historicActivityInstance.getActivityId().equals(rejectTaskDTO.getTaskKey())).collect(Collectors.toList());
HistoricActivityInstance historicActivityInstance = historicActivityInstanceList.get(0);
String toActId = historicActivityInstance.getActivityId();
taskService.createComment(task.getId(), processInstanceId, rejectTaskDTO.getMessage());
runtimeService.createProcessInstanceModification(processInstanceId)
.cancelActivityInstance(getInstanceIdForActivity(tree, task.getTaskDefinitionKey()))
.cancelAllForActivity(currentTaskId)
.setAnnotation("进行了驳回到指定任务节点操作")
.startBeforeActivity(toActId)//启动目标活动节点
.execute();
获取正在运行的实例节点:
/**
* select * from ACT_RU_EXECUTION where PROC_INST_ID_ = ?
*/
@Test
public void getActiveActivityIds() {
//执行实例ID
String executionId = "6001";
List<String> activeActivityIds = runtimeService.getActiveActivityIds(executionId);
System.out.println("####" + activeActivityIds);
}
@Test
public void getActivityInstance() {
//执行实例ID
String processInstanceId = "6001";
ActivityInstance activityInstance = runtimeService.getActivityInstance(processInstanceId);
System.out.println(activityInstance);
}
7.4 TaskService介绍
一个实例启动过后,对应的任务需要完成才会走入下一个节点,这时候我们会有大量对Task的操作,从产品的角度来看,一个任务启动会根据规则分配到人,然后该用户获取自己的任务列表,进行操作后完成该任务。
以下是任务常用方法及说明:
/**
* 通过受理人查询任务
*/
public List<Task> getTasksByAssignee(String assignee, int first, int max) {
return taskService.createTaskQuery().taskAssignee(assignee).listPage(first, max);
}
/**
* 通过流程id查询任务
*/
public List<Task> getTasksByProcessInstanceId(String processInstanceId, int first, int max) {
return taskService.createTaskQuery().processInstanceId(processInstanceId).listPage(first, max);
}
/**
* 完成任务
*/
public void completeTask(String taskId) {
taskService.complete(taskId);
}
/**
* 通过任务id设置变量
*/
public void setVariByTaskId(String taskId,Map<String,Object> variMap) {
taskService.setVariables(taskId,variMap);
}
/**
* 通过任务id查询
*/
public Task getTaskById(String taskId) {
return taskService.createTaskQuery().taskId(taskId).singleResult();
}
任务相关表:
ACT_HI_TASKINST
ACT_HI_IDENTITYLINK
ACT_HI_ACTINST
ACT_RU_TASK
8、camunda监听器
8.1 执行监听器
执行监听器的触发事件有:start、end、take;其中节点有start、end两种事件,而连线则有take事件。
如下给或签节点设置了开始事件和结束事件的执行监听器:
开启一条流程实例测试一下,这里我们在发起流程时就把所有节点审批人参数设置好。
发起人节点调用审批通过后就触发或签节点的执行监听器开始事件,两次是因为该监听事件是设置在节点上的,而或签节点设置了两个处理人,所以它会创建两个待办任务,每个待办任务都有对应的执行器,等到或签节点有人审批通过了,就会触发执行监听器结束事件。
说明:在触发该监听器之前,流程引擎就需要知道该节点上有几个待办任务从而创建对应数量的执行器。所以你可以选择在节点上的执行监听器结束事件触发时去设置下一节点审批人
8.2 任务监听器
任务监听器的触发事件有:create, assignment, update, complete, delete or timeout。
这里我们演示常用的create、assignment、complete事件。
重新部署后,把审批人设置方式再改回到发起流程时设置,然后发起流程实例,这里如果执行监听器的触发在任务监听器之前,那就更不能在任务监听器上动态设置审批人了。
所以动态设置审批人的条件就是要在节点执行监听器的开始事件触发之前就设置好审批人参数。
总结:
9、问题总结。
1、springBoot版本和camunda版本不一致问题。
2、流程开启后需要记录流程id,然后再通过流程id+用户查询任务并完成任务。
3、用户任务节点的审批人必须在该节点之前设置好。
4、使用runtimeservice开启流程时会造成线程阻塞,直到用户节点或结束节点。所以这里在实际使用时看是否需要异步。
5、如果发生异常,回滚到上一节点,如何查看该流程卡在哪个节点上。
10、问题探讨。
1、数据库表如何分离,camunda的49张表和业务表如何分开?
2、任务表、用户、用户组、租户等这些表如何用?是自己实现还是直接用camunda的?