SpringBoot项目实现发布订阅模式,真的很简单

news2025/1/2 0:26:39

大家好,我是老三,在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……

这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。

​下单之后的逻辑

所以,一般在开发的时候,都会把这些操作å抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。

​观察者模式

一开始,我们都是自己去写观察者模式。

自己实现观察者模式

观察者简图

观察者

  • 观察者定义接口

/**
 * @Author: fighter3
 * @Description: 观察者接口
 * @Date: 2022/11/7 11:40 下午
 */
public interface OrderObserver {

    void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
}
  • 具体观察者@Slf4j
    public class OrderMetricsObserver implements OrderObserver {
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] metrics");
    }}@Slf4j
    public class OrderLogObserver implements OrderObserver{
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] log.");
    }}@Slf4j
    public class OrderNotifyObserver implements OrderObserver{
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] notify.");
    }}
    • 业务通知观察者
    • 日志记录观察者
    • 监控埋点观察者

被观察者

  • 消息实体定义
@Data
public class PlaceOrderMessage implements Serializable {
    /**
     * 订单号
     */
    private String orderId;
    /**
     * 订单状态
     */
    private Integer orderStatus;
    /**
     * 下单用户ID
     */
    private String userId;
    //……
}
  • 被观察者抽象类
public abstract class OrderSubject {
    //定义一个观察者列表
    private List<OrderObserver> orderObserverList = new ArrayList<>();
    //定义一个线程池,这里参数随便写的
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

    //增加一个观察者
    public void addObserver(OrderObserver o) {
        this.orderObserverList.add(o);
    }

    //删除一个观察者
    public void delObserver(OrderObserver o) {
        this.orderObserverList.remove(o);
    }

    //通知所有观察者
    public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
        for (OrderObserver orderObserver : orderObserverList) {
            //利用多线程异步执行
            threadPoolExecutor.execute(() -> {
                orderObserver.afterPlaceOrder(placeOrderMessage);
            });
        }
    }
}

这里利用了多线程,来异步执行观察者。

  • 被观察者实现类
/**
 * @Author: fighter3
 * @Description: 订单实现类-被观察者实现类
 * @Date: 2022/11/7 11:52 下午
 */
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {

    /**
     * 下单
     */
    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //添加观察者
        this.addObserver(new OrderMetricsObserver());
        this.addObserver(new OrderLogObserver());
        this.addObserver(new OrderNotifyObserver());
        //通知观察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}

测试

    @Test
    @DisplayName("下单")
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }
  • 测试执行结果
2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver        : [afterPlaceOrder] metrics
2022-11-08 00:11:13.618  INFO 20235 --- [           main] cn.fighter3.obverser.OrderServiceImpl    : [placeOrder] end.
2022-11-08 00:11:13.618  INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver  : [afterPlaceOrder] notify.
2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver    : [afterPlaceOrder] log.

可以看到,观察者是异步执行的。

利用Spring精简

可以看到,观察者模式写起来还是比较简单的,但是既然都用到了Spring来管理Bean的生命周期,代码还可以更精简一些。

Spring精简观察者模式

观察者实现类:定义成Bean

@Slf4j
@Service
public class OrderMetricsObserver implements OrderObserver {

    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] metrics");
    }
}

@Slf4j
@Service
public class OrderNotifyObserver implements OrderObserver {

    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] notify.");
    }
}

被观察者:自动注入Bean

@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {

    /**
     * 实现类里也要注入一下
     */
    @Autowired
    private List<OrderObserver> orderObserverList;

    /**
     * 下单
     */
    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //通知观察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}

这样一来,发现被观察者又简洁了很多,但是后来我发现,在SpringBoot项目里,利用Spring事件驱动驱动模型(event)模型来实现,更加地简练。

Spring Event实现发布/订阅模式

Spring Event对发布/订阅模式进行了封装,使用起来更加简单,还是以我们这个场景为例,看看怎么来实现吧。

自定义事件

public class PlaceOrderEvent extends ApplicationEvent {

    public PlaceOrderEvent(PlaceOrderEventMessage source) {
        super(source);
    }
}

事件监听者

事件监听者,有两种实现方式,一种是实现ApplicationListener接口,另一种是使用@EventListener注解。

  • OrderLogObserver@Slf4j
    @Service
    public class OrderLogObserver implements OrderObserver {
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] log.");
    }}
  • OrderMetricsObserver
  • OrderNotifyObserver
  • OrderSubjectpublic abstract class OrderSubject {
    /**
    * 利用Spring的特性直接注入观察者*/

    @Autowired
    protected List<OrderObserver> orderObserverList;
    //定义一个线程池,这里参数随便写的
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
    //通知所有观察者
    public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
    for (OrderObserver orderObserver : orderObserverList) {
    //利用多线程异步执行
    threadPoolExecutor.execute(() -> {orderObserver.afterPlaceOrder(placeOrderMessage);});}}}
  • OrderServiceImpl
  • PlaceOrderEvent:继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。
  • PlaceOrderEventMessage:事件消息,定义了事件的消息体。
  • @Data
    public class PlaceOrderEventMessage implements Serializable {
        /**
         * 订单号
         */
        private String orderId;
        /**
         * 订单状态
         */
        private Integer orderStatus;
        /**
         * 下单用户ID
         */
        private String userId;
        //……
    }
    

​事件监听者实现

实现ApplicationListener接口

实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。

  • OrderLogListener

@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> {

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}
  • OrderMetricsListener
@Slf4j
@Service
public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> {

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] metrics");
    }
}
  • OrderNotifyListener
@Slf4j
@Service
public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> {

    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] notify.");
    }
}

使用@EventListener注解

使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。

  • OrderLogListener@Slf4j
    @Service
    public class OrderLogListener {
    @EventListener
    public void orderLog(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] log.");
    }}
  • OrderMetricsListener@Slf4j
    @Service
    public class OrderMetricsListener {
    @EventListener
    public void metrics(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] metrics");
    }}
  • OrderNotifyListener@Slf4j
    @Service
    public class OrderNotifyListener{
    @EventListener
    public void notify(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] notify.");
    }}

异步和自定义线程池

异步执行

异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:

  • OrderLogListener
@Slf4j
@Service
public class OrderLogListener  {

    @EventListener
    @Async
    public void orderLog(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}

当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。

@SpringBootApplication
@EnableAsync
public class DailyApplication {

    public static void main(String[] args) {
        SpringApplication.run(DairlyLearnApplication.class, args);
    }

}

自定义线程池

使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。

自定义线程池有三种方式:

@Async自定义线程池

  • 实现接口AsyncConfigurer

  • 继承AsyncConfigurerSupport

  • 配置由自定义的TaskExecutor替代内置的任务执行器

我们来看看三种写法:

  • 实现接口AsyncConfigurer

@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

    @Bean("fighter3AsyncExecutor")
    public ThreadPoolTaskExecutor executor() {
        //Spring封装的一个线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //随便写的一些配置
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(30);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("fighter3AsyncExecutor-");
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}

  • 继承AsyncConfigurerSupport
@Configuration
@Slf4j
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {

    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //随便写的一些配置
        threadPool.setCorePoolSize(10);
        threadPool.setMaxPoolSize(30);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        return threadPool;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}
  • 配置自定义的TaskExecutor@Slf4j
    @Service
    public class OrderLogListener {
    @EventListener
    @Async("asyncExecutor")
    public void orderLog(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] log.");
    }}
    • 配置线程池@Configuration
      public class TaskPoolConfig {
      @Bean(name = "asyncExecutor")
      public Executor taskExecutor() {
      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
      //随便写的一些配置
      executor.setCorePoolSize(10);
      executor.setMaxPoolSize(20);
      executor.setQueueCapacity(200);
      executor.setKeepAliveSeconds(60);
      executor.setThreadNamePrefix("asyncExecutor-");
      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      return executor;
      }}
    • 使用@Async注解的时候,指定线程池,推荐使用这种方式,因为在项目里,尽量做到线程池隔离,不同的任务使用不同的线程池

异步和自定义线程池这一部分只是一些扩展,稍微占了一些篇幅,大家可不要觉得Spring Event用起来很繁琐。

发布事件

发布事件也非常简单,只需要使用Spring 提供的ApplicationEventPublisher来发布自定义事件。

  • OrderServiceImpl@Service
    @Slf4j
    public class OrderServiceImpl implements OrderService {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    /**
    * 下单*/

    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
    log.info("[placeOrder] start.");
    PlaceOrderResVO resVO = new PlaceOrderResVO();
    //消息
    PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();
    //发布事件
    applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));
    log.info("[placeOrder] end.");
    return resVO;
    }}

在Idea里查看事件的监听者也比较方便,点击下面图中的图标,就可以查看监听者。

查看监听者

​监听者

测试

最后,我们还是测试一下。

   @Test
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }
  • 执行结果
2022-11-08 10:05:14.415  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] start.
2022-11-08 10:05:14.424  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] end.
2022-11-08 10:05:14.434  INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener    : [afterPlaceOrder] notify.
2022-11-08 10:05:14.435  INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener   : [afterPlaceOrder] metrics
2022-11-08 10:05:14.436  INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener       : [afterPlaceOrder] log.

可以看到,异步执行,而且用到了我们自定义的线程池。

小结

这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/62153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python中的集合详解

目录 一.思考 二.集合 基本语法 集合的常用操作——修改 1.添加新元素 2.移除元素 3.从集合中随机取出元素 4.清空集合 5. 消除两个集合的差集 6.两个集合的合并 注意&#xff1a;集合的遍历 三.集合总结 一.思考 为什么使用集合&#xff1f; 我们目前接触到了列表、元…

php后端+JQuery+Ajax简单表单提交

通过ajax,如果从后端直接想前端返回数组,那前端收到的是一个‘Array’的字符串。所以,我比较习惯的是用json对象的格式。由后端通过json_encode()函数,把数组封装成对象,传递到前端;前端也以json的格式接收。这里用提交表单来举例说明。 页面显示如下: JQueryAjax.…

Kafka服务端参数配置

$KAFKA_HOME/config/server.properties文件中的配置 1、zookeeper.connect 该参数用于配置Kafka要连接的Zookeeper/集群的地址。它的值是一个字符串&#xff0c;使用逗号分隔Zookeeper的多个地址。Zookeeper的单个地址是host:port形式的&#xff0c;可以在最后添加Kafka在Zoo…

idea+docker+jenkins+git构建自动化部署java项目

ideadockerjenkinsgit构建自动化部署java项目 默认jenkins已经 安装jdk 9 和maven 3.5 用于 执行pom.xml 打包构建镜像 [rootECS40833040 ~]# mkdir -p /usr/local/jenkins [rootECS40833040 jenkins]# vim jenkins.sh #!/usr/bin/env bash app_name005-springboot docker …

2、JSP——配置Tomcat服务器

目录 1、下载Tomcat服务器 2、Tomcat服务器的安装 3、Tomcat的目录结构 4、配置Tomcat运行环境 4.1 右击此电脑--->属性--->高级系统设置--->环境变量 4.2 在系统变量(s)中新建--->CATALINA_HOME--->变量值为Tomcat压缩文件解压后的地址 4.3 在系统变量(s…

[附源码]计算机毕业设计线上评分分享平台Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

C语言 动态内存管理

C语言 动态内存管理引言C/C程序的内存开辟声明一、动态内存管理函数1. malloc 和 free使用示例2. calloc使用示例3. realloc使用示例4. 注意事项二、动态内存管理函数使用时的错误示范程序清单1程序清单2程序清单3程序清单4程序清单5程序清单6三、经典的内存笔试题程序清单1程序…

【可转债,股票】低频量化数据

目录历史文章股票明日涨停预测指数60日线偏离数据上证指数 MA60偏离度深证成指 MA60偏离度创业板指 MA60偏离度中小100 MA60偏离度上证50 MA60偏离度沪深300 MA60偏离度中证500 MA60偏离度中证1000 MA60偏离度科创50 MA60偏离度恒生科技 MA60偏离度恒生科技指数 MA60偏离度可转…

【强化学习论文合集 | 2018年合集】二. AAAI-2018 强化学习论文

强化学习(Reinforcement Learning, RL),又称再励学习、评价学习或增强学习,是机器学习的范式和方法论之一,用于描述和解决智能体(agent)在与环境的交互过程中通过学习策略以达成回报最大化或实现特定目标的问题。 本专栏整理了近几年国际顶级会议中,涉及强化学习(Rein…

Pytorch2.0发布了,向下兼容,加一句代码,性能翻番

概述 介绍PyTorch 2.0&#xff0c;我们迈向PyTorch下一代2系列发行版的第一步。在过去的几年里&#xff0c;我们进行了创新和迭代&#xff0c;从PyTorch 1.0到最近的1.13&#xff0c;并转移到新成立的PyTorch基金会&#xff0c;它是Linux基金会的一部分。 除了我们令人惊叹的…

Python中的全局变量与命名法

--------------------------------------------------------------------------------------------------------------------------------- 在本文章中&#xff0c;我们来讨论一下python中的全局变量&#xff0c;我们将学习如何定义全局变量&#xff0c;然后如何在函数中访问它…

HTML静态网页作业——澳门英文旅游网站设计与实现HTML+CSS+JavaScript

&#x1f468;‍&#x1f393;学生HTML静态网页基础水平制作&#x1f469;‍&#x1f393;&#xff0c;页面排版干净简洁。使用HTMLCSS页面布局设计,web大学生网页设计作业源码&#xff0c;这是一个不错的旅游网页制作&#xff0c;画面精明&#xff0c;排版整洁&#xff0c;内容…

在windows server 2016安装Web服务器(IIS)

无论是用.NET开发网站和Web服务&#xff0c;都需要发布于Web服务器&#xff08;IIS&#xff09;上,以下记录在windows server 2016 标准版上安装Web服务器&#xff08;IIS&#xff09;的过程。 1.打开服务器管理器&#xff0c;选择“添加角色和功能” 2.确认管理员帐户为强密码…

[附源码]计算机毕业设计基于springboot的汽车租赁系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Android S(31) PMS 服务启动原理

1、PMS 启动 SystemServer进程启动引导服务时启动PMS,方法调用SystemServer.run()-startBootstrapServices&#xff0c;通过 PackageManagerService.main中 new PackageManagerService()实例化PackageManagerService对象。 PMS 的主要作用是解析 system/和 data/ 目录下的apk…

讲讲我是如何一步步成为CSDN博客专家的心路历程

大家好&#xff0c;给大家先做个自我介绍 我是码上代码&#xff0c;大家可以叫我码哥 我也是一个普通本科毕业的最普通学生&#xff0c;我相信大部分程序员或者想从事程序员行业的都是普通家庭的孩子&#xff0c;所以我也是靠自己的努力&#xff0c;从毕业入职到一家传统企业&a…

pytorch 学习第三天 交叉熵

交叉熵 信息量 假设X是一个离散型随机变量&#xff0c;其取值集合为X&#xff0c;概率分布函数为p(x)Pr(Xx),x∈X 我们定义事件Xx0的信息量为&#xff1a;I(x0)−log(p(x0))&#xff0c;可以理解为&#xff0c;一个事件发生的概率越大&#xff0c;则它所携带的信息量就越小&a…

计算机网络(自顶向下)—第四章测验题

“计算机网络”第四章测验题 一个 B 类网络 128.16.0.0/16 被网络管理员划分为 16 个大小相同的子网&#xff0c;则子网掩码为255.255.240.0。如果按照 IP 地址从小到大对子网进行编号&#xff0c;写出第2 个子网的地址范围&#xff0c;用 a.b.c.d/x 的形式表示128.16.16.0/20。…

阿里最新产,SpringCloud 微服务核心技术全解手册 Github 星标 50k

SpringCloud 想必每一位 Java 程序员都不会陌生&#xff0c;很多人一度把他称之为“微服务全家桶”&#xff0c;它通过简单的注解&#xff0c;就能快速地架构微服务&#xff0c;这也是 SpringCloud 的最大优势。但是最近有去面试过的朋友就会发现&#xff0c;现在面试你要是没有…

C语言第十七课:初阶指针

目录 前言&#xff1a; 一、指针是什么&#xff1a; 1.那么指针到底是什么呢&#xff1f; 2.内存中的数据存储原理&#xff1a; 3.数据存储与指针使用实例&#xff1a; 4.存储编址原理&#xff1a; 二、指针和指针类型&#xff1a; 1.决定了指针的步长&#xff1a; 2.决定了…