基于SpringBoot实现一个可扩展的事件总线

news2025/2/24 16:29:29

基于SpringBoot实现一个可扩展的事件总线

前言

在日常开发中,我们经常会用到事件总线,SpringBoot通过事件多播器的形式为我们提供了一个事件总线,但是在开发中我们经常会用到其他的实现,比如Guava、Disruptor的。我们将基于SpringBoot封装一套底层驱动可扩展的,统一api的事件驱动组件。

环境准备

jdk1.8
spring-boot-autoconfigure
Guava
Disruptor

pom文件如下

Copy<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.billetsdoux</groupId>
    <artifactId>eventBus</artifactId>
    <version>1.0.0</version>
    <name>eventBus</name>
    <description>eventBus</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>31.1-jre</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <scope>compile</scope>
            <version>5.8.9</version>
        </dependency>

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.4</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>



</project>


组件介绍

整体架构#

目录结构如下:

[外链图片转存中…(img-m0HXGURP-1703487623977)]

我们的核心是一个EventListenerRegistry,它便是我们提供统一api的入口,它有两个方法,一个是init方法,在SpringBoot容器启动的时候会去注册我们所有的事件监听器,publish 方法则为事件发布的方法。这里我为它提供了3种实现,GuavaSpringDisruptor

[外链图片转存中…(img-AtJntQUE-1703487623978)]

EventModel#

这是我们定义的事件模型,topic为事件主题,我们通过不同的topic对应不同的事件处理器,entity为具体的事件对象模型

Copypackage com.billetsdoux.eventbus.model;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
public class EventModel<T> implements Serializable {

    /**
     *  事件发布主题
     */
    private String topic;

    /**
     *  事件对象模型
     */
    private T entity;
}


EventListener#

EventListener 为事件消费接口,定义了2个方法,topic() 为监听的事件topic,onMessage()为事件的回调接口。

Copypackage com.billetsdoux.eventbus;

/**
 *  消费接口
 */
public interface EventListener<T> {

    String topic();

    void onMessage(T message);
}


EventListenerRegistry#

这边是我们之前介绍的事件核心接口,它提供两个接口 initRegistryEventListener 负责注册我们所定义的所有事件监听器,publish 负责发送消息,我们底层的驱动需要继承这个接口。

Copypublic interface EventListenerRegistry<P> {
    void initRegistryEventListener(List<EventListener> eventConsumerList);

    void publish(P param);
}


SpringEventListenerRegistry#

这是我们通过Spring为我们提供的消息多播器来实现的一个事件驱动。这个类被@Component标记,那么它会在容器启动的时候,通过构造器为我们注入 eventListeners ,applicationContext 。eventListeners 为所有实现了EventListener接口,并被注入到容器里面的类。

initRegistryEventListener 这是一个空方法,因为他们本身已经在容器中了,所以不需要注册了

publish: 直接调用applicationContext.publishEvent就可以了。

Copypackage com.billetsdoux.eventbus.spring;
import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

@RequiredArgsConstructor
@Slf4j
@Component
public class SpringEventListenerRegistry implements EventListenerRegistry<EventModel> {


    final ApplicationContext applicationContext;

    final List<EventListener> eventListeners;



    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {

    }

    @Override
    public void publish(EventModel param) {
        applicationContext.publishEvent(param);
    }

    @PostConstruct
    public void init(){
        log.info("开始初始化Spring事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Spring事件监听器的组件服务");
    }
}


GuavaEventListenerRegistry#

基于Guava来实现的事件总线,我们首先还是需要容器帮我们注入eventListeners。相较于Spring我们需要自己定义一个Guava的EventBus,然后把我们的Listener注册到这个EventBus中。

publish方法则是调用EventBus的post方法到。

Copy
package com.billetsdoux.eventbus.guava;

import cn.hutool.core.thread.ThreadUtil;

import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.objenesis.instantiator.util.ClassUtils;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.ExecutorService;

@Component("guava")
@Slf4j
public class GuavaEventListenerRegistry implements EventListenerRegistry<EventModel> {

    EventBus eventBus;

    final List<EventListener> eventListeners;

    public GuavaEventListenerRegistry(List<EventListener> eventListeners) {
        this.eventListeners = eventListeners;
    }

    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {
        final ExecutorService executor = ThreadUtil.newExecutor(10, 20, 300);
        eventBus = new AsyncEventBus(GuavaEventListenerRegistry.class.getName(),executor);
        eventConsumerList.forEach(param->{
            log.info("注册监听器:{}",param.getClass().getName());
            eventBus.register(ClassUtils.newInstance(param.getClass()));
        });
    }

    @Override
    public void publish(EventModel param) {
        eventBus.post(param);
    }

    @PostConstruct
    public void init(){

        log.info("开始初始化Guava事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Guava事件监听器的组件服务");
    }

}


DisruptorEventListenerRegistry#

Disruptor的实现相对来说麻烦一点,它首先需要一个实现了EventFactory接口的类,它提供一个newInstance接口来创建事件对象模型。

具体的使用方式可以参考我这篇博文:Disruptor入门

EventModelFactory

我们首先还是需要注入我们的Listener,只是这里在init的时候是将我们的Listener交给我们的Disruptor去处理,我们先将Listener转成EventHandler,所以我们的监听器接口具体实现的时候除了实现我们定义的EventListener接口外还需要继承Disruptor的EventHandler接口。 调用disruptor.handleEventsWith(dataListener); 把我们的Listener交给Disruptor去管理。最后再启动Disruptor。

publish:调用Disruptor的RingBuffer来进行消息的发送。

Copy
/**
 *  事件工厂
 *  Disruptor 通过EventFactory在RingBuffer中预创建Event的实例
 * @param <T>
 */
public class EventModelFactory<T> implements EventFactory<EventModel<T>> {
    @Override
    public EventModel<T> newInstance() {
        return new EventModel<>();
    }
}


Copy@Slf4j
@RequiredArgsConstructor
@Component("disruptor")
@Scope("prototype") // 线程安全问题
public class DisruptorEventListenerRegistry implements EventListenerRegistry<EventModel>,AutoCloseable {

    /**
     *  disruptor事件处理器
     */
    @Getter
    @Setter
    private Disruptor<EventModel> disruptor;

    @NonNull
    final List<EventListener> eventListeners;



    /**
     *  RingBuffer的大小
     */
    private final int DEFAULT_RING_SIZE = 1024 * 1024;

    /**
     *  事件工厂
     */
    private EventFactory<EventModel> eventFactory = new EventModelFactory();

    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {

        disruptor = new Disruptor<>(eventFactory, DEFAULT_RING_SIZE, createThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());
        EventHandler[] dataListener = eventConsumerList.stream().map(param -> {
            EventListener<EventModel> eventModelEventListener = param;
            return eventModelEventListener;
        }).collect(Collectors.toList()).toArray(new EventHandler[eventConsumerList.size()]);
        log.info("注册服务信息接口:{}",dataListener);

        disruptor.handleEventsWith(dataListener);
        disruptor.start();
    }

    @Override
    public void publish(EventModel param) {
        publishEvent(param);
    }

    public void publishEvent(EventModel... eventModels){
        Objects.requireNonNull(disruptor, "当前disruptor核心控制器不可以为null");
        Objects.requireNonNull(eventModels, "当前eventModels事件控制器不可以为null");

        // 发布事件
        final RingBuffer<EventModel> ringBuffer = disruptor.getRingBuffer();
        try {
            final List<EventModel> dataList = Arrays.stream(eventModels).collect(Collectors.toList());

            for (EventModel element : dataList) {

                // 请求下一个序号
                long sequence = ringBuffer.next();

                // 获取该序号对应的事件对象
                EventModel event =  ringBuffer.get(sequence);
                event.setTopic(element.getTopic());
                event.setEntity(element.getEntity());
                ringBuffer.publish(sequence);

            }
        }catch (Exception e) {
            log.error("error",e);
        }

    }


    /**
     *  关闭处理机制
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        if (Objects.nonNull(disruptor)) disruptor.shutdown();

    }

    @PostConstruct
    public void init(){
        log.info("开始初始化Disruptor事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Disruptor事件监听器的组件服务");
    }

    private static ThreadFactory createThreadFactory(){
        AtomicInteger integer = new AtomicInteger();

        return r-> new Thread(r,"disruptor-"+integer.incrementAndGet());
    }
    
    
}


至此我们已经实现了我们的目标三个EventListenerRegistry,我们接下来看看我们Listener如何实现。

BaseEventListener#

我们刚说过我们的Listener需要同时实现EventHandler跟EventListener,所以我们定义一个抽象类,注意这个EventListener是我们定义的,EventHandler是Disruptor定义的。

Copypublic abstract class BaseEventListener<T> implements EventListener<T>, EventHandler<T> {

}


ExecutableEventListener#

我们定义一个抽象类ExecutableEventListener 我们来实现一下里面的方法。

对于Spring跟Guava来说只需要在方法上添加注解便可以在事件发生的时候回调过来,而对于Disruptor来说它的回调是继承EventHandler里面的onEvent方法。所以我们在onEvent里面手动调用onMessage方法,让所有的消息都转发给onMessage处理。

@org.springframework.context.event.EventListener Spring的回调注解

@Subscribe 的回调注解

onMessage:我们先调用topic()方法获取Listener方法的topic,这个方法我们这里先不实现,交给具体的实现类去实现这个方法。我们再定义一个handle的抽象方法,则是我们具体的消息处理逻辑的方法,也交给具体的实现类去实现。

Copy@Slf4j
public abstract class ExecutableEventListener extends BaseEventListener<EventModel<?>> {

    @org.springframework.context.event.EventListener
    @Subscribe
    @Override
    public void onMessage(EventModel<?> message) {
        log.info("收到消息:{}",message);

        if (topic().equals(message.getTopic())){
            handle(message);
        }
    }

    @Override
    public void onEvent(EventModel<?> event, long sequence, boolean endOfBatch) throws Exception {
        onMessage(event);
    }

    /**
     *  具体消息处理方法
     * @param message
     */
    protected abstract void handle(EventModel<?> message);

}


至此我们的核心代码就开发完成了,现在定义两个注解,让我们能够在项目中启用它。

EnableEventBus:在启动类上添加这个注解以启用EventBus

[外链图片转存中…(img-OjGimDb2-1703487623978)]

EventBusConfiguration:配置一下Spring的包扫描路径
img

测试

我们把我们刚写的项目install到本地maven仓库,以便我们在项目中能够引用它。我们新建一个SpringBootWeb项目添加这个依赖测试下

在pom中添加

Copy  <dependency>
            <groupId>com.billetsdoux</groupId>
            <artifactId>eventBus</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

在启动类上添加这个注解启用

img

添加一个事件监听器,继承ExecutableEventListener,监听”blog“消息的主题。

img

添加一个Controller来测试一下我们的事件总线。它根据我们的type来选择不同的底层驱动:spring,guava,disruptor

[外链图片转存中…(img-ZndsBWCc-1703487623979)]

我们打包成docker镜像然后启动。
Dockerfile如下:

CopyFROM openjdk:8-jre-slim
MAINTAINER billtsdoux

WORKDIR /app

ADD target/eventbus_blog*.jar app.jar

EXPOSE 8080

ENV JVM_OPTS="-Xmx256m -Xms256m" \
    TZ=Asia/Shanghai

RUN ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \
    && echo $TZ > /etc/timezone

ENTRYPOINT ["sh","-c","java -jar $JVM_OPTS app.jar"]



我们这里配置一下打包后镜像的名称,已经启动的容器名称跟监听的端口。

[外链图片转存中…(img-xKzuGe5r-1703487623980)]

构建成功

img

并且也启动一个容器:

[外链图片转存中…(img-QETw4G24-1703487623980)]

查看日志可以看到我们内置的三个监听器注册器已经成功启动了。

[外链图片转存中…(img-ScQl7W0a-1703487623980)]

我们测试一下接口:可以看到根据我们选择的不同类型我们可以选择不同的实现。

[外链图片转存中…(img-aFD49ezm-1703487623981)]

[外链图片转存中…(img-CnPONtYE-1703487623981)]

[外链图片转存中…(img-mDQW65LN-1703487623981)]

后言

如果提供这个三个不够用,我们还可以通过实现这个接口EventListenerRegistry来扩展我们的事件总线组件,再注入到容器中,在调用的时候选择具体的实现就好了。

标签: java , Spring

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1334658.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

迪杰斯特拉算法详解

迪杰斯特拉算法详解 首先要知道的是&#xff0c;迪杰斯特拉算法是求解单源最短路径的&#xff0c;就是在一个图中&#xff08;无向图和有向图均可&#xff09;&#xff0c;指定一个源点&#xff0c;求出来这个源点到其他各个节点的最短路径。 存图 首先&#xff0c;我需要用…

HarmonyOS应用兼容稳定性云测试

兼容性测试 兼容性测试主要验证HarmonyOS应用在华为真机设备上运行的兼容性问题&#xff0c;包括首次安装、再次安装、启动、卸载、崩溃、黑白屏、闪退、运行错误、无法回退、无响应、设计约束场景。具体兼容性测试项的详细说明请参考兼容性测试标准。 兼容性测试支持TV、智能穿…

爬虫系列----Python解析Json网页并保存到本地csv

Python解析JSON 1 知识小课堂1.1 爬虫1.2 JSON1.3 Python1.4 前言技术1.4.1 range1.4.2 random1.4.3 time.sleep1.4.4 with open() as f: 2 解析过程2.1 简介2.2 打开调试工具2.3 分析网址2.3.1 网址的规律2.3.2 网址的参数 2.4 爬取第一页内容2.5 存入字典并获取2.6 循环主体数…

DolphinScheduler 介绍及系统架构

目录 一、DolphinScheduler 介绍 1.1 关于 DolphinScheduler 1.2 特性 简单易用 丰富的使用场景 High Reliability High Scalability 1.3 名词解释 1.3.1 名词解释 1.3.2 模块介绍 二、DolphinScheduler 系统架构 2.1 系统架构图 2.2 架构说明 MasterServer 该服…

【Java核心基础】一文带你了解Java中super关键字的重要作用

“super”关键字在编程中扮演着重要角色&#xff0c;它允许我们直接访问父类中的属性、方法或构造函数&#xff0c;即使子类中存在同名元素。此外&#xff0c;“super()”在子类构造函数中调用父类初始化操作&#xff0c;确保父类属性正确初始化。有时&#xff0c;“super”还可…

Python 爬虫之下载视频(五)

爬取第三方网站视频 文章目录 爬取第三方网站视频前言一、基本情况二、基本思路三、代码编写四、注意事项&#xff08;ffmpeg&#xff09;总结 前言 国内主流的视频平台有点难。。。就暂且记录一些三方视频平台的爬取吧。比如下面这个&#xff1a; 一、基本情况 这次爬取的方…

OpenHarmony之内核层解析~

OpenHarmony简介 技术架构 OpenHarmony整体遵从分层设计&#xff0c;从下向上依次为&#xff1a;内核层、系统服务层、框架层和应用层。系统功能按照“系统 > 子系统 > 组件”逐级展开&#xff0c;在多设备部署场景下&#xff0c;支持根据实际需求裁剪某些非必要的组件…

【RocketMQ】Console页面报错:rocketmq remote exception,connect to xxx failed.

现象 console报错&#xff0c;无法连接该节点&#xff0c;把该节点杀掉&#xff0c;还是继续报错&#xff0c;重启之后&#xff0c;报错的端口变成11911。 分析 正常一个broker会启动三个端口&#xff0c;不同版本的规律不太一样&#xff0c;4.X版本是&#xff1a; 配置文件…

如何使用ScrapySharp下载网页内容

C#简介 C#是一种由微软开发的通用、面向对象的编程语言。它结合了C和C的优点&#xff0c;并封装了Java的一些特性。C#被广泛评价Windows平台的软件开发&#xff0c;包括Web应用、桌面应用和游戏开发等领域。 使用场景 在网络数据挖掘和信息收集的过程中&#xff0c;我们需要…

3D模型如何制作透明玻璃材质

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 1、什么是玻璃材质 在3D建模和渲染中&#xff0c;玻璃是一种非常常见…

【期末复习】微信小程序复习大纲( 1- 5 章)

前言&#xff1a; 这周开始进入期末复习周&#xff0c;没时间看C/C、linux等知识了&#xff0c;先把期末考试必考的知识捋一遍。 目录 第一章 微信小程序入门 一、填空题 二、判断题 三、选择题 四、简答题 第二章 微信小程序页面制作 一、填空题 二、判…

【六大排序详解】中篇 :选择排序 与 堆排序

选择排序 与 堆排序 选择排序 选择排序 与 堆排序1 选择排序1.1 选择排序原理1.2 排序步骤1.3 代码实现 2 堆排序2.1 堆排序原理2.1.1 大堆与小堆2.1.2 向上调整算法2.1.3 向下调整算法 2.2 排序步骤2.3 代码实现 3 时间复杂度分析 Thanks♪(&#xff65;ω&#xff65;)&#…

智慧交通应用钡铼技术无线工业边缘路由网关R10A

智慧交通应用中&#xff0c;无线工业边缘路由网关扮演着至关重要的角色。在这方面&#xff0c;钡铼技术无线工业边缘路由网关R10A被广泛应用于交通管理系统中&#xff0c;它具备一路RS485、一路WAN、一路LAN、4G和WiFi等功能。本文将详细介绍R10A的参数以及在智慧交通领域的应用…

蓝桥题库(X图形(矩阵))

题目剖析&#xff1a; 简单来说就是找到一个由字母组成的X图形&#xff0c;且每个边上的字母都与中心点的字母相同 算法设计&#xff1a; 1.从中心点向外辐射&#xff0c;每找到一个这样的图形&#xff0c;则次数加一 2.从最外层向中心点靠拢&#xff0c;如果中间遇到不满足…

Unity Shader Early-Z技术

Unity Shader Early-Z技术 Early-Z技术Unity渲染顺序总结Alpha Test&#xff08;Discard&#xff09;在移动平台消耗较大的原因 Early-Z技术 传统的渲染管线中&#xff0c;ZTest其实是在Blending阶段&#xff0c;这时候进行深度测试&#xff0c;所有对象的像素着色器都会计算一…

外汇天眼:交易高手!是这样炼成的!

在外汇市场中&#xff0c;那些总是赚的“盆满钵满”的外汇投资高手实在是让人羡慕不已&#xff0c;他们能够准确预测市场走势&#xff0c;抓住每一个交易机会&#xff0c;实现高收益&#xff0c;很多投资新手因此也想入市&#xff0c;但即使是这样&#xff0c;还是有很多新手对…

关于标准那些事——第五篇 两仪

国家标准的编写&#xff0c;对于标准的名称和结构&#xff0c;很多人往往是不那么在意的&#xff0c;但这恰恰也是非常重要的点&#xff0c;今天就给大家分享一下这太极所生的“两仪”。我会用最精简的文字概括出核心内容&#xff0c;让大家有一个初步且完整的概念&#xff0c;…

规律生活指南:数据可视化助你游刃有余

随着信息时代的到来&#xff0c;我们生活在一个数据海洋中&#xff0c;每天都会面对大量的信息和数字。在这个信息过载的时代&#xff0c;如何从杂乱的数据中找到规律&#xff0c;让生活更加有序成为了一项挑战。而数据可视化作为一种强大的工具&#xff0c;不仅能够帮助我们理…

算法基础之数字三角形

数字三角形 核心思想&#xff1a;线性dp 集合的定义为 f[i][j] –> 到i j点的最大距离 从下往上传值 父节点f[i][j] max(f[i1][j] , f[i1][j1]) w[i][j] 初始化最后一层 f w #include <bits/stdc.h>using namespace std;const int N 510;int w[N][N],f[N][…

ACM模式Java输入输出模板

输入输出练习网站&#xff1a;https://kamacoder.com/ Java读写模板 Scanner 方式一&#xff1a;Scanner&#xff08;效率不高&#xff09; public class Main {public static void main(String[] args) {// 第一个方式ScannerScanner sc new Scanner(System.in);String s …