整合Rocketmq实现审批流消息推送

news2025/1/23 11:15:56

文章目录

      • Docker 部署 RocketMQ
        • 拉取 RocketMQ 镜像
        • 创建容器共享网络
      • 部署NameServer
        • 创建目录并授予权限
        • 拷贝启动脚本
        • 启动容器`NameServer`
      • 部署Broker + Proxy
        • 创建挂载文件夹并授权
        • 创建`broker.cnf`文件
        • 拷贝启动脚本
        • 启动容器Broker
      • 部署RocketMQ控制台(rocketmq-dashboard)
        • 拉取镜像
        • 启动容器Rocketmq-dashboard
        • 访问RMQ控制台
      • Docker-Compose 部署 RocketMQ
        • 拉取镜像
        • NameServer 创建挂载文件夹并授权
        • 拷贝启动脚本文件
        • 修改脚本文件
        • Broker 创建挂载文件夹并授权
        • 创建配置文件`broker.cnf`
        • 拷贝启动脚本文件
        • 修改脚本文件
        • 编写docker-compose.yml文件
        • 启动服务
        • 访问RocketMQ控制台
      • PmHub 实战-客户端配置
        • pmhub-project / workflow的 pom 把 Rocketmq 依赖开启
        • 任务逾期提醒任务开启注释( TaskOverdueNotifyJob )
        • 在 nacos 的 application-dev.yml 中添加以下配置
        • 创建消费者组和 topic
        • 启动 pmhub-project,将 topic 注入到消费组中。建立消息通道
        • 查看RMQ控制台是否建立
        • 重置消费点位
      • PmHub 实战-消息组件搭建
      • **PmHub 实战-任务审批流结果回执&待办**

  • 将项目和任务审批流消息放入 Rocketmq,实现消息的异步解耦,提升系统效率和服务稳定性。

image.png

整个消息队列组件其实就三部分:

  • 生产者:生产消息的一方,例子中的顾客就是生产者
  • 消息队列:就是消息的「篮子」,用来存放消息
  • 消费者:专门负责消息的一方,比如例子中的厨师

消息队列核心的三大使用场景是必须要知道的:解耦、异步、削峰

  • 官网地址:https://rocketmq.apache.org/

  • 开源地址:https://github.com/apache/rocketmq

  • 每个 Broker 与NameServer集群中的所有节点建立长连接, 定时注册 Topic 信息到NameServer

  • Producer 与 NameServer 集群中的一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态

  • Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息

Docker 部署 RocketMQ

拉取 RocketMQ 镜像
docker pull apache/rocketmq:5.1.0
创建容器共享网络
docker network create rocketmq

为什么要创建 docker 共享网络?

  • 容器间通信:创建一个 Docker 网络可以确保同一个网络中的容器可以通过容器名称进行通信,而不需要知道对方的 IP 地址。这对于需要相互通信的服务非常重要,比如 RocketMQ 的多个组件(如 NameServer 和 Broker)。
  • 隔离性和安全性:Docker 网络提供了一个隔离的网络环境,不同网络中的容器彼此隔离。这增加了安全性,防止外部或其他不相关的容器访问敏感服务。
  • 简化配置:使用 Docker 网络,配置变得更加简单。容器可以通过名称互相访问,无需担心容器重启后 IP 地址发生变化。

部署NameServer

创建目录并授予权限
mkdir -p /data/rocketmq/nameserver/{bin,logs}
chmod 777 -R /data/rocketmq/nameserver/*
拷贝启动脚本
# 启动容器NameServer
docker run -d \
--privileged=true --name rmqnamesrv \
apache/rocketmq:5.1.0 sh mqnamesrv

# 拷贝启动脚本
docker cp rmqnamesrv:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh /data/rocketmq/nameserver/bin/
启动容器NameServer
# 删除容器 NameServer
docker rm -f rmqnamesrv

# 启动容器 NameServer
docker run -d --network rocketmq \
--privileged=true --restart=always \
--name rmqnamesrv -p 9876:9876 \
-v /data/rocketmq/nameserver/logs:/home/rocketmq/logs \
-v /data/rocketmq/nameserver/bin/runserver.sh:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh \
apache/rocketmq:5.1.0 sh mqnamesrv

# 部分命令解释 : 
1. -e "MAX_HEAP_SIZE=256M" 设置最大堆内存和堆内存初始大小
2. -e "HEAP_NEWSIZE=128M"  设置新生代内存大小

# 查看启动日志
docker logs -f rmqnamesrv
  • 看到 ‘The Name Server boot success…’, 表示NameServer 已成功启动。

image.png

部署Broker + Proxy

创建挂载文件夹并授权
mkdir -p /data/rocketmq/broker/{store,logs,conf,bin}
chmod 777 -R /data/rocketmq/broker/*
创建broker.cnf文件
vi /data/rocketmq/broker/conf/broker.conf

# nameServer 地址多个用;隔开 默认值null
# 例:127.0.0.1:6666;127.0.0.1:8888 
namesrvAddr = 192.168.100.100:9876
# 集群名称
brokerClusterName = DefaultCluster
# 节点名称
brokerName = broker-a
# broker id节点ID, 0 表示 master, 其他的正整数表示 slave,不能小于0 
brokerId = 0
# Broker服务地址	String	内部使用填内网ip,如果是需要给外部使用填公网ip
brokerIP1 = 192.168.100.100
# Broker角色
brokerRole = ASYNC_MASTER
# 刷盘方式
flushDiskType = ASYNC_FLUSH
# 在每天的什么时间删除已经超过文件保留时间的 commit log,默认值04
deleteWhen = 04
# 以小时计算的文件保留时间 默认值72小时
fileReservedTime = 72
# 是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# 禁用 tsl
tlsTestModeEnable = false

# 下面是没有注释的版本
namesrvAddr = 192.168.100.100:9876
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1 = 192.168.100.100
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
deleteWhen = 04
fileReservedTime = 72
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
tlsTestModeEnable = false
拷贝启动脚本
# 启动 Broker 容器
docker run -d \
--name rmqbroker --privileged=true \
apache/rocketmq:5.1.0 sh mqbroker

# 拷贝脚本文件
docker cp rmqbroker:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh /data/rocketmq/broker/bin
启动容器Broker
# 删除容器 Broker
docker rm -f rmqbroker

# 启动容器 Broker
docker run -d --network rocketmq \
--restart=always --name rmqbroker --privileged=true \
-p 10911:10911 -p 10909:10909 \
-v /data/rocketmq/broker/logs:/root/logs \
-v /data/rocketmq/broker/store:/root/store \
-v /data/rocketmq/broker/conf/broker.conf:/home/rocketmq/broker.conf \
-v /data/rocketmq/broker/bin/runbroker.sh:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
apache/rocketmq:5.1.0 sh mqbroker --enable-proxy -c /home/rocketmq/broker.conf

# 查看启动日志
docker logs -f rmqbroker

image.png

部署RocketMQ控制台(rocketmq-dashboard)

拉取镜像
docker pull apacherocketmq/rocketmq-dashboard:latest
启动容器Rocketmq-dashboard
docker run -d \
--restart=always --name rmq-dashboard \
-p 8080:8080 --network rocketmq \
-e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
apacherocketmq/rocketmq-dashboard
访问RMQ控制台
  • http://192.168.100.100:8080/** ( IP地址改成自己的 )**

image.png

Docker-Compose 部署 RocketMQ

拉取镜像
docker pull apache/rocketmq:5.1.0
docker pull apacherocketmq/rocketmq-dashboard:latest
NameServer 创建挂载文件夹并授权
mkdir -p /usr/local/rocketmq/nameserver/{logs,bin}
chmod 777 -R /usr/local/rocketmq/nameserver/*
拷贝启动脚本文件
docker run -d \
--privileged=true --name mqnamesrv \
apache/rocketmq:5.1.0 sh mqnamesrv

docker cp mqnamesrv:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh /usr/local/rocketmq/nameserver/bin

docker rm -f mqnamesrv
修改脚本文件
vi /usr/local/rocketmq/nameserver/bin/runserver.sh 

image.png

Broker 创建挂载文件夹并授权
mkdir -p /usr/local/rocketmq/broker/{logs,store,conf,bin}
chmod 777 -R /usr/local/rocketmq/broker/*
创建配置文件broker.cnf
vi /usr/local/rocketmq/broker/conf/broker.conf

# 添加如下配置
brokerClusterName = DefaultCluster
brokerName = broker
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 192.168.100.100
tlsTestModeEnable = false
拷贝启动脚本文件
docker run -d \
--name mqbroker --privileged=true \
apache/rocketmq:5.1.0 sh mqbroker

docker cp mqbroker:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh /usr/local/rocketmq/broker/bin

docker rm -f mqbroker
修改脚本文件
vi /usr/local/rocketmq/broker/bin/runbroker.sh 

image.png

编写docker-compose.yml文件
cd /usr/local/rocketmq
vi docker-compose.yml
version: '3.8'
services:
  mqnamesrv:
    image: apache/rocketmq:5.1.0
    container_name: rocketmq-mqnamesrv
    ports:
      - 9876:9876
    restart: always
    privileged: true
    networks:
      - rocketmq
    volumes:
      - /usr/local/rocketmq/nameserver/logs:/home/rocketmq/logs
      - /usr/local/rocketmq/nameserver/bin/runserver.sh:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh
    environment:
      - MAX_HEAP_SIZE=256M
      - HEAP_NEWSIZE=128M
    command: ["sh","mqnamesrv"]
  broker:
    image: apache/rocketmq:5.1.0
    container_name: rocketmq-mqbroker
    ports:
      - 10909:10909
      - 10911:10911
    restart: always
    privileged: true
    networks:
      - rocketmq
    volumes:
      - /usr/local/rocketmq/broker/logs:/home/rocketmq/logs
      - /usr/local/rocketmq/broker/store:/home/rocketmq/logs
      - /usr/local/rocketmq/broker/conf/broker.conf:/home/rocketmq/broker.conf
      - /usr/local/rocketmq/broker/bin/runbroker.sh:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh
    depends_on:
      - 'mqnamesrv'
    environment:
      - NAMESRV_ADDR=rocketmq-mqnamesrv:9876
      - MAX_HEAP_SIZE=512M
      - HEAP_NEWSIZE=256M
    command: ["sh","mqbroker","-c","/home/rocketmq/broker.conf"]
  proxy:
    image: apache/rocketmq:5.1.0
    container_name: rocketmq-pmproxy
    networks:
      - rocketmq
    depends_on:
      - 'mqnamesrv'
      - 'broker'
    ports:
      - 8080:8080
      - 8081:8081
    restart: on-failure
    environment:
      - NAMESRV_ADDR=rocketmq-mqnamesrv:9876
    command: sh mqproxy
  rmqdashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rmq-dashboard
    ports:
      - 8082:8080
    restart: always
    privileged: true
    networks:
      - rocketmq
    depends_on:
      - 'mqnamesrv'
      - 'broker'
      - 'proxy'
    environment:
      - JAVA_OPTS= -Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=rocketmq-mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false

networks:
  rocketmq:
    driver: bridge
启动服务
# 一键启动
docker compose up -d

# 一键停止所有容器
docker compose stop

# 一键删除所有容器
docker compose rm

# 一键查看所有启动的容器
docker compose ps
访问RocketMQ控制台
  • http://192.168.100.100:8082/** ( IP地址改成自己的 )**

image.png

PmHub 实战-客户端配置

pmhub-project / workflow的 pom 把 Rocketmq 依赖开启

image.png

任务逾期提醒任务开启注释( TaskOverdueNotifyJob )

image.png

在 nacos 的 application-dev.yml 中添加以下配置
# 项目相关配置
pmhub:
  
  # 企微消息相关
  workWx:
    host: https://laigeoffer.cn:7880
    corpid: ww212312313
    corpsecret: DIPuyCcN7C1231231
    addressSecret: eFTxBtSvzUyBO1JCNTT1jfzzRq1OG123123
    agentid: 1000008
    aeskey: txwiJmdUJlC8T5c8oaXm4G3OiM12312
    path:

 # rocketMQ 配置
  rocketMQ:
   # 配置nameserver代理地址
    addr: 192.168.100.100:8081
    topic:
     # 企微消息topic
      wxMessage: pmhub_local
    # 消费者组  
    group:
      wxMessage: PMHUB_GROUP
创建消费者组和 topic

image.png
image.png

image.png
image.png

启动 pmhub-project,将 topic 注入到消费组中。建立消息通道

image.png

查看RMQ控制台是否建立

image.png
image.png

重置消费点位

image.png

至此 rocketmq 的客户端配置完成,也就是说,PmHub 此时已经能正常和 rocketmq 服务端进行通信了,并且已经绑定了 topic 和消费者组,接下来看 PmHub 是如何利用 rocketmq 进行业务布局的吧。

PmHub 实战-消息组件搭建

在 PmHub 中,苍何老师单独抽离了消息组件封装,专门用来进行消息收发,任何服务只需要添加 pmhub-base-notice 即可拥有消息收发能力。组件如下:

image.png

  • 几个核心关键类 :
<!--Rocketmq-企微消息提醒-->
<dependency>
  <groupId>com.laigeoffer.pmhub-cloud</groupId>
  <artifactId>pmhub-base-notice</artifactId>
</dependency>
/**
 * message消费者
 *
 * @author canghe
 * @date 2023/07/21
 */
@Component
public class OAMessageConsumer implements CommandLineRunner {


    /**
     * 微信topic
     * */
    @Value("${pmhub.rocketMQ.topic.wxMessage}")
    private String WX_TOPIC;


    /**
     * 服务器地址
     * */
    @Value("${pmhub.rocketMQ.addr}")
    private String addr;



    /**
     * 消费组
     * */
    @Value("${pmhub.rocketMQ.group.wxMessage}")
    private String WX_CONSUMER_GROUP;

    @Resource
    private RedisService redisService;


    /**
     * 运行注册监听器
     *
     * @param args 参数
     * @throws Exception 异常
     */
    @Override
    public void run(String... args) throws Exception {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(addr)
                .build();

        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        try {
            FilterExpression wxFilterExpression = new FilterExpression(RocketMqUtils.mqTag, FilterExpressionType.TAG);

            PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                    .setClientConfiguration(clientConfiguration)
                    // 设置消费者分组。
                    .setConsumerGroup(WX_CONSUMER_GROUP)
                    // 设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap(WX_TOPIC, wxFilterExpression))
                    // 设置消费监听器。
                    .setMessageListener(messageView -> {
                        // 处理消息并返回消费结果。
                        LogFactory.get().info("Consume message successfully, messageId={}", messageView.getMessageId());


                        try {
                            Charset charset = StandardCharsets.UTF_8;
                            String json = charset.decode(messageView.getBody()).toString();

                            ObjectMapper objectMapper = new ObjectMapper();
                            JsonNode jsonNode = objectMapper.readTree(json);
                            String type = jsonNode.get("type").asText();
                            LogFactory.get().info(">>>>>>>>>>>>>>>>>>>>消息类型:"+type);
                            switch (type){
                                case "任务审批提醒":
                                    ProcessRemindDTO processRemindDTO = JSONUtil.toBean(json, ProcessRemindDTO.class);
                                    // 消息幂等性校验(查询redis中同一个 taskId 和 Assignee 是否重复消费)
                                    if (redisService.hasKey(processRemindDTO.getTaskId() + "_" + processRemindDTO.getAssignee())) {
                                        LogFactory.get().info("消息重复消费,instanceId:{}, taskId:{}"+processRemindDTO.getInstId(), processRemindDTO.getTaskId());
                                        return ConsumeResult.FAILURE;
                                    }

                                    // 发送消息
                                    WxResult wxResult =  MessageUtils.sendMessage(processRemindDTO.toWxMessage());

                                    // 信息发送成功,保存message
                                    // cleanMessage(processRemindDTO.getInstId());
                                    MessageDataDTO messageDataDTO = new MessageDataDTO();
                                    messageDataDTO.setMsgCode(wxResult.getResponse_code());
                                    messageDataDTO.setMsgTime(System.currentTimeMillis());
                                    messageDataDTO.setWxUserName(processRemindDTO.getWxUserName());
                                    redisService.setCacheObject(processRemindDTO.getTaskId() + "_" + processRemindDTO.getAssignee(), messageDataDTO);
                                    LogFactory.get().info("新增消息instanceId:{}, taskId:{}"+processRemindDTO.getInstId(), processRemindDTO.getTaskId());
                                    LogFactory.get().info(JSONUtil.toJsonStr(wxResult));
                                    break;
                                 case "审批流结束回执":
                                    // 消息回执
                                    ProcessReturnDTO processReturnDTO = JSONUtil.toBean(json, ProcessReturnDTO.class);
                                    LogFactory.get().info(JSONUtil.toJsonStr(MessageUtils.sendMessage(processReturnDTO.toWxMessage())));
                                    break;
                                case "待办提醒":
                                    // 待办提醒
                                    TodoRemindDTO todoRemindDTO = JSONUtil.toBean(json, TodoRemindDTO.class);
                                    LogFactory.get().info(JSONUtil.toJsonStr(MessageUtils.sendMessage(todoRemindDTO.toWxMessage())));
                                    break;
                                case "任务逾期提醒":
                                    // 任务逾期提醒
                                    TaskOverdueRemindDTO taskOverdueRemindDTO = JSONUtil.toBean(json, TaskOverdueRemindDTO.class);
                                    LogFactory.get().info(JSONUtil.toJsonStr(MessageUtils.sendMessage(taskOverdueRemindDTO.toWxMessage())));
                                    break;
                                case "任务已逾期提醒":
                                    // 任务逾期提醒
                                    TaskOvertimeRemindDTO taskOvertimeRemindDTO = JSONUtil.toBean(json, TaskOvertimeRemindDTO.class);
                                    LogFactory.get().info(JSONUtil.toJsonStr(MessageUtils.sendMessage(taskOvertimeRemindDTO.toWxMessage())));
                                    break;
                                case "任务指派提醒":
                                    // 任务指派提醒
                                    TaskAssignRemindDTO taskAssignRemindDTO = JSONUtil.toBean(json, TaskAssignRemindDTO.class);
                                    LogFactory.get().info(JSONUtil.toJsonStr(MessageUtils.sendMessage(taskAssignRemindDTO.toWxMessage())));
                                    break;
                                default:
                                    break;
                            }
                        }catch (Exception ex){
                            LogFactory.get().error("未知的微信审批提醒消息:");
                            LogFactory.get().error(ex);
                            return ConsumeResult.SUCCESS;
                        }
                        return ConsumeResult.SUCCESS;
                    })
                    .build();
            LogFactory.get().info("企微通知消息RocketMQ通道已建立,TOPIC:" + WX_TOPIC);
        } catch (ClientException e) {
            LogFactory.get().error(e);
        }
    }

}
/**
 * RocketMQ连接工具
 * @author canghe
 */
@Component
public class RocketMqUtils {


    /**
     * 连接地址
     * */
    private static String addr;


    /**
     * 微信topic
     * */
    private static String WX_TOPIC;

    @Value("${pmhub.rocketMQ.addr}")
    private void setAddr(String addr) {
        RocketMqUtils.addr = addr;
    }
    @Value("${pmhub.rocketMQ.topic.wxMessage}")
    public void setWxTopic(String wxMessage) {
        WX_TOPIC = wxMessage;
    }

    /**
     * rocketmq 消息Tag
     * */
    public final static String mqTag = "WX_MASSAGE";

    @Resource
    private RedisService redisService;



    /**
     * 推送到微信topic
     * */
    public static void push2Wx(com.laigeoffer.pmhub.base.notice.domain.entity.Message ob){

        try {

            String key = IdUtil.simpleUUID();
            ObjectMapper objectMapper = new ObjectMapper();
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoint = addr;
            // 消息发送的目标Topic名称,需要提前创建。
            String topic = WX_TOPIC;
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
            ClientConfiguration configuration = builder.build();
            // 初始化Producer时需要设置通信配置以及预绑定的Topic。
            Producer producer = provider.newProducerBuilder()
                    .setTopics(topic)
                    .setClientConfiguration(configuration)
                    .build();
            // 普通消息发送。
            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    // 设置消息索引键,可根据关键字精确查找某条消息。
                    .setKeys(key)
                    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                    .setTag(mqTag)
                    // 消息体。
                    .setBody(objectMapper.writeValueAsString(ob).getBytes())
                    .build();

            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            LogFactory.get().info("Send message successfully, messageId={}", sendReceipt.getMessageId());

            producer.close();
        } catch (ClientException | IOException e) {
            LogFactory.get().error("推送微信消息时发生错误:", e);
        }

    }


    public void cleanMessage(String instId){
        LogFactory.get().info("清理消息:" + instId);
        try {
            // 清理消息
            MessageDataDTO messageDataDTO = (MessageDataDTO) redisService.getCacheObject(instId);
            if (messageDataDTO != null) {
                ProcessWxMessageStateUpdateDTO processWxMessageStateUpdateDTO = new ProcessWxMessageStateUpdateDTO();
                processWxMessageStateUpdateDTO.setAtall(1);
                processWxMessageStateUpdateDTO.setResponse_code(messageDataDTO.getMsgCode());
                processWxMessageStateUpdateDTO.getButton().setReplace_name(ButtonStateEnum.FINISH);
                WxResult wxResult =  MessageUtils.updateMessage(processWxMessageStateUpdateDTO);
                LogFactory.get().info(JSONUtil.toJsonStr(wxResult));
                redisService.deleteObject(instId);
                LogFactory.get().info("消息存在");
            }
        } catch (Exception ex){
            LogFactory.get().info("消息不存在");
        }
    }



}

PmHub 实战-任务审批流结果回执&待办

  • 任务审批流结果回执&待办,就是说任务审批到什么状态,

  • 都需要实时监听,并下发通知,其实主要是配置监听器逻辑。

  • 配置监听执行器:

/**
 * 抽象类监听执行器
 * @author canghe
 * @date 2023-07-11 09:35
 */
public abstract class ListenerAbstractExecutor {
    public abstract void push2Wx(ListenerDTO listenerDTO);
}
  • 监听器抽象类工厂:
/**
 * 监听器抽象类工厂
 * @author canghe
 * @date 2023-01-09 09:25
 */
@Service
public class ListenerFactory {

    private static final Map<String, String> beanNames = new ConcurrentHashMap<>();
    static {
        ListenerTypeEnum[] listenerTypeEnums = ListenerTypeEnum.values();
        for (ListenerTypeEnum listenerTypeEnum : listenerTypeEnums) {
            beanNames.put(listenerTypeEnum.getType(), listenerTypeEnum.getBeanName());
        }
    }

    // 通过 Map 注入,通过 spring bean 的名称作为 key 动态获取对应实例
    @Autowired
    private Map<String, ListenerAbstractExecutor> executorMap;
    // 工厂层执行器
    public void execute(String type, ListenerDTO listenerDTO) {
        String beanName = beanNames.get(type);
        if (StringUtils.isEmpty(beanName)) {
            return;
        }
        // 决定最终走哪个类的执行器
        ListenerAbstractExecutor executor = executorMap.get(beanName);
        if (executor == null) {
            return;
        }
        executor.push2Wx(listenerDTO);
    }
}
  • 不同的状态监听执行不同的监听器:
/**
 * @author canghe
 * @date 2023-07-11 09:25
 */
public enum ListenerTypeEnum {

    PROJECT("project", "projectListenerExecutor"),
    TASK("task", "taskListenerExecutor"),
    PURCHASE_INTO("PURCHASE_INTO", "purchaseIntoListenerExecutor"),
    PURCHASE_OUT("PURCHASE_OUT", "purchaseOutListenerExecutor"),
    OTHER_INTO("OTHER_INTO", "otherIntoListenerExecutor"),
    OTHER_OUT("OTHER_OUT", "otherOutListenerExecutor"),
    RETURN_INTO("RETURN_INTO", "returnIntoListenerExecutor"),
    SUPPLIER_APPROVAL("SUPPLIER_APPROVAL", "supplierApprovalListenerExecutor"),
    SCRAPPED_OUT("USELESS_OUT", "scrappedOutListenerExecutor");
    private final String type;
    private final String beanName;
    ListenerTypeEnum(String type, String beanName) {
        this.type = type;
        this.beanName = beanName;
    }

    public String getType() {
        return type;
    }

    public String getBeanName() {
        return beanName;
    }
}

这里的Pmhub实战总结不全, 主要是有些地方调试不通( 笔者菜菜~ ), 就没往下写了…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2044039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

12、springboot3 vue3开发平台-前端-记住我功能实现

文章目录 1. 前端用户信息保存2. 登录页面添加3. 后端实现 1. 前端用户信息保存 使用pinia持久化保存用户名密码 src/stores/remember-me.js // 定义 store import { defineStore } from "pinia" import {reactive} from vueexport const useRememberMeStore defi…

求职Leetcode算法题(7)

1.搜索旋转排序数组 这道题要求时间复杂度为o&#xff08;log n&#xff09;&#xff0c;那么第一时间想到的就是二分法&#xff0c;二分法有个前提条件是在有序数组下&#xff0c;我们发现在这个数组中存在两部分是有序的&#xff0c;所以我们只需要对前半部分和后半部分分别…

element ——tree组件懒加载数据、自定义label、修改高亮样式、回显点击状态

需求 整体宽高占一屏&#xff0c;超出滚动条tree组件点击懒加载每一级数据&#xff0c;一共三级三级节点前加icon&#xff0c;标识是否已学习点击高亮显示背景图横向超出省略显示或者横向滚动条纵向超出纵向滚动条修改其字体和间距☆☆☆☆☆从别的页面跳入回显三级点击状态 …

netsh int tcp show global查看TCP参数

TCP 全局参数 接收方缩放状态 : enabled 接收窗口自动调节级别 : normal 加载项拥塞控制提供程序 : default ECN 功能 : disabled RFC 1323 时间戳 : allowed 初始 RTO : 1000 接收段合并状态 : enabled 非 Sack Rtt 复原 : disabled 最大 SYN 重新传输次数 : 4 快速打开 : en…

CrowdTransfer:在AIoT社区中实现众包知识迁移

这篇论文的标题是《CrowdTransfer: Enabling Crowd Knowledge Transfer in AIoT Community》&#xff0c;由 Yan Liu, Bin Guo, Nuo Li, Yasan Ding, Zhouyangzi Zhang, 和 Zhiwen Yu 等作者共同撰写&#xff0c;发表在《IEEE Communications Surveys & Tutorials》上。以下…

springboot航班进出港管理系统--论文源码调试讲解

第2章 开发环境与技术 本章节对开发航班进出港管理系统管理系统需要搭建的开发环境&#xff0c;还有航班进出港管理系统管理系统开发中使用的编程技术等进行阐述。 2.1 Java语言 Java语言是当今为止依然在编程语言行业具有生命力的常青树之一。Java语言最原始的诞生&#xff…

网络协议--TCP/IP协议栈--三握和四挥

文章目录 网络设备交换机交换机的工作原理 路由器路由器工作原理 TCP/IP协议栈TCP/IP四层模型TCP/IP通信过程TCP特性TCP包头结构源端口、目标端口序列号(seq)确认号(小ack)标记位 TCP协议端口号端口号分类ssh服务nc工具抓包 socket套接字端口占用 三次握手Wireshark抓包tcpdump…

构建完美人工智能工程师培养计划

一、理论基础构建 1. 数学与统计学基础&#xff1a;作为AI的基石&#xff0c;扎实的数学与统计学知识不可或缺。培养计划应涵盖高等数学、线性代数、概率论与数理统计、优化理论等课程&#xff0c;为学员打下坚实的理论基础。 2. 计算机科学基础&#xff1a;包括数据结构、算…

DLT645-2007通信协议---读取解析智能电表数据

一、DLT645-2007通讯协议 DLT645-2007是中国电力行业规定的一种智能电表通信协议&#xff0c;主要用于电能表与数据采集设备之间的通信。DLT645-2007协议定义了电能表与数据采集设备之间的数据格式、通信方式、命令集等内容&#xff0c;用于实现电能表数据的采集、传输和管理。…

SpringBoot整合Liquibase

1、是什么&#xff1f; Liquibase官网 Liquibase是一个开源的数据库管理工具&#xff0c;可以帮助开发人员管理和跟踪数据库变更。它可以与各种关系型数据库和NoSQL数据库一起使用&#xff0c;并提供多种数据库任务自动化功能&#xff0c;例如数据库迁移、版本控制和监控。Li…

盲盒抽奖源码

介绍&#xff1a; 功能上还可以,商品和盲盒可以在你程序里添加&#xff0c;设置概率等!! 新盲盒星球抽奖商城手机网站源码 随机开箱抢购 代码有点大&#xff0c;三百多M。 教程搭建很简单&#xff0c;基本10分钟搭建一套&#xff0c;可一个服务器搭建多套&#xff0c;只要你…

【时时三省】(C语言基础)模拟实现字符串相关函数

山不在高&#xff0c;有仙则名。水不在深&#xff0c;有龙则灵。 ----CSDN 时时三省 模拟实现库函数:strcpy 示例: const修饰指针 示例: const 修饰变量&#xff0c;这个变量为常变量&#xff0c;不能被修改&#xff0c;但本质上还是变量 正常num&#xff1d;20是改不了它…

招聘管理型岗位,HR会考察候选人的哪些方面?

团队管理能力 团队管理能力可以说是管理型岗位最基本的要求&#xff0c;只有具备优秀的团队管理能力&#xff0c;才能够带领团队实现组织目标&#xff0c;提高团队凝聚力&#xff0c;而想要考察一个人是否具备团队管理能力&#xff0c;就要通过多方面来测试。可以先了解一下候…

CSS笔记总结:第五天(HTML+CSS笔记完结)

Xmind鸟瞰图&#xff1a; 简单文字总结&#xff1a; css知识总结&#xff1a; 元素的显示与隐藏&#xff1a; 1.通过display隐藏元素 不保留位置 2.通过visibility 隐藏元素 保留位置 3.overflow 溢出隐藏 鼠标样式cursor&#xff1a; 1.defauly小白 2.p…

走进 keepalived:解析高可用架构背后的关键技术

一、什么是keepalived Keepalived 是一个用于实现服务器高可用性&#xff08;High Availability&#xff0c;简称 HA&#xff09;的软件。 简单来说&#xff0c;它的主要作用是检测服务器的状态&#xff0c;并在主服务器出现故障时&#xff0c;自动将服务切换到备份服务器上&…

SVN权限控制解析

一、基础数据说明 1. 代码目录存在多级 2. 角色存在多级 二、规则说明 结合例子讲规则 1、多级文件夹 a. 继承与覆盖 【文件夹层级】&#xff1a; Repositories/BS_Projects/科顺 BS_Projects包含了多个项目&#xff0c;每个项目是一个文件夹&#xff0c;比如“科顺”是其…

分布式事务Seata保证审批状态一致性

文章目录 下载安装Seata创建对应数据库修改application.yml相应配置启动SeataPmHub 实战——添加任务事务管理业务库添加undo_log 表对应服务加上对应的seata依赖Nacos 配置文件 pmhub-project-dev.yml 添加 seata 配置&#xff1a;接口添加 GlobalTransactional 注解涉及数据表…

Centos7升级gitlab(17)

在 CentOS 7 中将 GitLab 从版本 17.1.1 升级到 17.2.2&#xff0c;涉及以下步骤。请务必在升级前备份数据&#xff0c;以防止升级过程中出现问题导致数据丢失。 升级步骤 1. 备份 GitLab 数据 在升级之前&#xff0c;确保已经备份了 GitLab 的数据&#xff0c;包括数据库、…

【windows安装gradle】

1.去官网下载自己需要的版本。 2.直接解压到指定目录 3.配置环境变量 3.1.新建 GRADLE_HOME 环境变量值指向你的 Gradle 的解压路径 3.2.将 %GRADLE_HOME%\bin 添加到 Path 环境变量中 4.打开cmd命令行输入gradle -v查看是否安装成功以及当前版本 下面显示说明已经安装完成了…

软件测试用例的编写(六)

软件测试用例 定义 测试用例&#xff08;TestCase&#xff09;是为项目需求而编制的一组测试输入&#xff0c;执行步骤&#xff0c;以及预期结果&#xff0c;以便测试某个程序是否满足客户需求 可以总结为&#xff1a;每一个测试点的数据设计和步骤设计 – 对测试点的细化 作…