通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦 (史上最全)

news2025/1/20 14:54:49

事件总线在 进行JAVA模块解耦 ,价值巨大

实际开发中,常常 通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦 ,
比如,在顶级开源组件 hotkey的源码中, 就多次用到 EventBus/AsyncEventBus进行JAVA模块解耦

在这里插入图片描述

所以,专门写一篇文章,介绍这个 非常适用的技巧和组件。

此文不断更新, 最新版本,请参见 此文的博客园 版本

通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦 (史上最全 + 最新版本)

使用事件总线EventBus/AsyncEventBus进行发布订阅JAVA模块解耦

EventBus 是 Guava 的事件处理机制,是观察者模式(生产/消费模型)的一种实现。

观察者模式在我们日常开发中使用非常广泛,例如在订单系统中,订单状态或者物流信息的变更会向用户发送APP推送、短信、通知卖家、买家等等;审批系统中,审批单的流程流转会通知发起审批用户、审批的领导等等。

Observer模式也是 JDK 中自带就支持的,其在 1.0 版本就已经存在 Observer,不过随着 Java 版本的飞速升级,其使用方式一直没有变化,许多程序库提供了更加简单的实现,例如 Guava EventBus、RxJava、EventBus 等

为什么要用 EventBus ,其优点 ?

EventBus 优点

  • 相比 Observer 编程简单方便
  • 通过自定义参数可实现同步、异步操作以及异常处理
  • 单进程使用,无网络影响

缺点

  • 只能单进程使用
  • 项目异常重启或者退出不保证消息持久化

如果需要分布式使用还是需要使用 MQ

EventBus介绍:

EventBus是google的Guava库中的一个处理组件间通信的事件总线,它基于发布/订阅模式,实现了多组件之间通信的解耦合,事件产生方和事件消费方实现解耦分离,提升了通信的简洁性。

为什么使用事件总线?

当一个事件的发生(事件产生方),需要触发很多事件(事件消费方)的时候,我们通常会在事件产生方中,分别的去调用那些事件消费方,这样往往是很浪费资源。事件的产生方与事件的消费方,产生了极大的耦合,如果我们要改动某一个事件消费方,我们很可能还要改动事件的产生方。

使用场景:

在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知)。

如果是同一个jvm里面通知的话,就可以使用EventBus。由于EventBus使用起来简单、便捷,因此,工作中会经常用到。

EventBus 是线程安全的,分发事件到监听器,并提供相应的方式让监听器注册它们自己。

EventBus允许组件之间进行 “发布-订阅” 式的通信,而不需要这些组件彼此知道对方。

EventBus是专门设计用来替代传统的Java进程内的使用显示注册方式的事件发布模式。

EventBus不是一个通用的发布-订阅系统,也不是用于进程间通信。

EventBus的三个关键点

EventBus有三个关键要素:

img

1、事件(Event)

事件是EventBus之间相互通信的基本单位,一个Event可以是任何类型。

对,没错,就是Object,只要你想将任意一个Bean作为事件,这个类不需要做任何改变,就可以作为事件Event。不过在项目中不会这么随便(除非对代码严谨度没什么要求。。)

,一般会定义特定的事件类,类名以Event作为后缀,里面定义一些变量或者函数等。

2、事件发布者(Publisher)

事件发布者,就是发送事件到EventBus事件总线的一方,事件发布者调用Post()方法,将事件发给EventBus。

你可以在程序的任何地方,调用EventBus的post()方法,发送事件给EventBus,由EventBus发送给订阅者们。

3、事件订阅者(Subscriber)

事件订阅者,就是接收事件的一方,这些订阅者需要在自己的方法上,添加@Subscribe注解声明自己为事件订阅者。不过只声明是不够的,还需要将自己所在的类,注册到EventBus中,EventBus才能扫描到这个订阅者。

EventBus的事件发布和接收

接收事件

一个对象接收事件时,将这样做:

  • 暴露一个public方法 ,称之为事件订阅者(subscriber),

    这个方法接收一个参数,参数的类型是事件期望的类型。

  • 用@Subscribe注解标计这个方法

  • 通过一个EventBus实例的register(Object)方法注册自己

提交事件

提交事件时,将简单的把事件对象作为参数,去调用 EventBus实例的pose(Object)方法。

EventBus实例将根据事件对象的类型,决定如何路由这个事件对象给所有已注册的监听器。

事件的路由是基于事件对象的类型,所以,对象的类型是路由的关键,也是订阅者注册的关键。

一个事件将被交付给可以被分配的任意的订阅者。

当post方法被调用后,所有对这个事件进行注册的订阅者,会按顺序进行消费, 所以订阅者会快速合理地运行。

如果一个事件可能触发一个扩展的过程(比如数据库负载),生成一个线程或队列之后处理,可以使用异步的AsyncEventBus。

订阅方法

事件订阅者的方法必需只能接受一个参数:事件对象。
订阅者方法如果抛出异常,EventBus实例将捕获和记录异常。

很少有方案这样去处理错误,只是在开发时可用于帮助我们发现问题时会这样做。

EventBus实例保证在同时不会有多个线程调用,除非这个方法通过@AllowConcurrentEvents注解明确允许。

如果这个注解没有出现,订阅者方法也无需担心方法被重入,除非在EventBus实例之外有代码调用该方法。

死事件

如果一个事件被提交了,但是没有相应的订阅者接受它,就可以认为这是一个死事件。

然后会给系统一个机会来处理这个死事件。

可以通过包装一个类DeadEvent的实例来处理这个死事件。然后可以写一个类专门负责订阅死事件。

如果一个订阅者监听的事件对象是所有事件的父类,比如这个事件订阅了一个Object的事件对象,那么将不会出现死事件。

案例1:EventBus同步事件总线

首先导入guava的依赖

<dependency>
 <groupId>com.google.guava</groupId>
 <artifactId>guava</artifactId>
 <version>28.2-jre</version>
</dependency>

其次看下本案例目录结构:

在这里插入图片描述

目录结构:

center:

定义和封装 eventbus 消息总线。

event:

一个自定义的事件类,一个普通的java类

类的内容随意定义。

subscribe:

定义了两个事件监听者类,类里面的方法加@Subscribe注解。

testCase:

测试方法。

同步eventbus 事件总线封装

定义和封装 eventbus 消息总线。

package eventbus.center;
 
import com.google.common.eventbus.EventBus;

public class EventBusCenter {
    private static EventBus eventBus;

    //双重锁单例模式
    private static EventBus getEventBus(){
        if(eventBus==null){
            synchronized (EventBus.class){
                if(eventBus==null){
                    eventBus = new EventBus();
                }
            }
        }
        return eventBus;
    }
    public static void post(Object event){
        getEventBus().post(event);
    }
    public static void register(Object object){
        getEventBus().register(object);
    }
 
}

事件定义

package eventbus.event;
/**
 * @desc 自定义事件类
 **/
public class CustomEvent {
    private int data;
    public CustomEvent(int data){
        this.data = data;
    }
    public int getData(){
        return this.data;
    }
}

定义: 订阅者1

package eventbus.subscribe;
import com.google.common.eventbus.Subscribe;
import eventbus.event.CustomEvent;

import java.time.Instant;
/**
 * @desc 事件监听 1
 **/
public class Subscriber1 {
    @Subscribe
    public void test1(CustomEvent event){
        System.out.println(Instant.now() +"监听者1-->回调1,收到事件:"+event.getData()+",线程号为:"+Thread.currentThread().getName());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Subscribe
    public void test2(CustomEvent event){
        System.out.println(Instant.now() +"监听者1-->回调2,收到事件:"+event.getData()+",线程号为:"+Thread.currentThread().getName());
    }
}

定义: 订阅者2

package eventbus.subscribe;
import com.google.common.eventbus.Subscribe;
import eventbus.event.CustomEvent;

import java.time.Instant;

/**
 * @desc 事件监听 2
 **/
public class Subscriber2 {

    @Subscribe
    public void test1(CustomEvent event){
        System.out.println(Instant.now() +"监听者2-->回调1,收到事件:"+event.getData()+",线程号为:"+Thread.currentThread().getName());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Subscribe
    public void test2(CustomEvent event){
        System.out.println(Instant.now() +"监听者2-->回调2,收到事件:"+event.getData()+",线程号为:"+Thread.currentThread().getName());
    }
}

订阅者总结

可以看到,两个订阅者,两个订阅者订阅的都是同一个事件对象。

每个订阅者,又定义了两个回调方法,

待会观察一下EventBus同步的方式下,收到事件之后订阅者们的处理方式。

定义测试用例

 @Test
    public  void testEventBus() {
        Subscriber1 listener1 = new Subscriber1();
        Subscriber2 listener2 = new Subscriber2();
        CustomEvent customEvent = new CustomEvent(23);
        EventBusCenter.register(listener1);
        EventBusCenter.register(listener2);
        EventBusCenter.post(customEvent);

        System.out.println(Instant.now() +",主线程执行完毕:"+Thread.currentThread().getName());
    }


上面是测试类,创建了2个订阅者的对象,,并且注册给了EventBus,调用EventBus的同步post方法执行。

结果如下:

执行结果

在这里插入图片描述

同步EventBus总结规律:

可以看到每一个事件的消费方在执行时,都是用的调用方的线程,并且同一时间只能同时执行一个订阅者的方法。

从Subscriber1里的方法比Subscriber2里的方法先执行可以看出:

先注册到EventBus的订阅者在收到事件后会先执行。

案例2:EventBus异步事件总线

AsyncEventBus: 异步事件总线

1.异步执行,事件发送方异步发出事件,不会等待事件消费方是否收到,直接执行自己后面的代码。

2.在定义AsyncEventBus时,构造函数中会传入一个线程池。

事件消费方收到异步事件时,消费方会从线程池中获取一个新的线程来执行自己的任务。

3.同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。

异步eventbus 事件总线封装

定义和封装 异步eventbus 消息总线。

package eventbus.center;

import com.google.common.eventbus.AsyncEventBus;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @desc 事件总线工具类
 **/
public class AsyncEventBusCenter {
    private static AsyncEventBus asyncEventBus;
    private static Executor executor = Executors.newFixedThreadPool(2);

    //双重锁单例模式
    private static AsyncEventBus getAsynEventBus(){
        if(asyncEventBus==null){
            synchronized (AsyncEventBus.class){
                if(asyncEventBus==null){
                    asyncEventBus = new AsyncEventBus(executor);
                }
            }
        }
        return asyncEventBus;
    }

    //异步方式发送事件
    public static void post(Object event){
        getAsynEventBus().post(event);
    }
    public static void register(Object object){
        getAsynEventBus().register(object);
    }
 
}

定义测试用例

   @Test
    public  void testAsyncEventBus() {
        Subscriber1 listener1 = new Subscriber1();
        Subscriber2 listener2 = new Subscriber2();
        CustomEvent customEvent = new CustomEvent(23);
        AsyncEventBusCenter.register(listener1);
        AsyncEventBusCenter.register(listener2);
        AsyncEventBusCenter.post(customEvent);
        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Instant.now() +",主线程执行完毕:"+Thread.currentThread().getName());
    }


上面是测试类,创建了2个订阅者的对象,并且注册给了EventBus,调用EventBus的同步post方法执行。

结果如下:

执行结果

在这里插入图片描述

异步EventBus总结规律:

这里由于并行执行,订阅者的方法中有sleep,因此也让生产者主线程进行了10秒的等待。

注意:这里的生产者和消费者(订阅者)异步执行。

异步执行,两个订阅者同时执行,并且是为事件消费方重新开的一个新的线程去执行自己的任务,互相不等待。

1.如果线程足够,同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序,上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。

2.如果线程不够,谁先注册到EventBus的,谁先执行。

四、EventBus和AsyncEventBus使用区别

上面的测试案例简单,并且很能说明问题。

EventBus: 同步事件总线

1.同步执行,事件发送方在发出事件之后,会等待所有的事件消费方执行完毕后,才会回来继续执行自己后面的代码。

2.事件发送方和事件消费方会在同一个线程中执行,消费方的执行线程取决于发送方。

3.同一个事件的多个订阅者,在接收到事件的顺序上面有不同。

谁先注册到EventBus的,谁先执行,如果是在同一个类中的两个订阅者一起被注册到EventBus的情况,收到事件的顺序跟方法名有关。

AsyncEventBus: 异步事件总线

1.异步执行,事件发送方异步发出事件,不会等待事件消费方是否收到,直接执行自己后面的代码。

2.在定义AsyncEventBus时,构造函数中会传入一个线程池。

事件消费方收到异步事件时,消费方会从线程池中获取一个新的线程来执行自己的任务。

3.如果线程足够,同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序,上没有任何联系,都会同时收到事件,并且都是在新的线程中,异步并发的执行自己的任务。

4.如果线程不够,谁先注册到EventBus的,谁先执行。

建议

生产环境中,建议大家使用异步的 事件总线

参考文献:

  1. 疯狂创客圈 JAVA 高并发 总目录

    ThreadLocal 史上最全

  2. 4000页《尼恩 Java 面试宝典 》的 35个面试专题

  3. 价值10W的架构师知识图谱

4、尼恩 架构师哲学

5、尼恩 3高架构知识宇宙

https://blog.csdn.net/oppo5630/article/details/80173520

https://blog.csdn.net/qq_38345296/article/details/100539989

推荐阅读:

  • 《 场景题:假设10W人突访,你的系统如何做到不 雪崩?》

  • 《尼恩Java面试宝典》

  • 《Springcloud gateway 底层原理、核心实战 (史上最全)》

  • 《Flux、Mono、Reactor 实战(史上最全)》

  • 《sentinel (史上最全)》

  • 《Nacos (史上最全)》

  • 《分库分表 Sharding-JDBC 底层原理、核心实战(史上最全)》

  • 《TCP协议详解 (史上最全)》

  • 《clickhouse 超底层原理 + 高可用实操 (史上最全)》

  • 《nacos高可用(图解+秒懂+史上最全)》

  • 《队列之王: Disruptor 原理、架构、源码 一文穿透》

  • 《环形队列、 条带环形队列 Striped-RingBuffer (史上最全)》

  • 《一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系(史上最全)

  • 《单例模式(史上最全)

  • 《红黑树( 图解 + 秒懂 + 史上最全)》

  • 《分布式事务 (秒懂)》

  • 《缓存之王:Caffeine 源码、架构、原理(史上最全,10W字 超级长文)》

  • 《缓存之王:Caffeine 的使用(史上最全)》

  • 《Java Agent 探针、字节码增强 ByteBuddy(史上最全)》

  • 《Docker原理(图解+秒懂+史上最全)》

  • 《Redis分布式锁(图解 - 秒懂 - 史上最全)》

  • 《Zookeeper 分布式锁 - 图解 - 秒懂》

  • 《Zookeeper Curator 事件监听 - 10分钟看懂》

  • 《Netty 粘包 拆包 | 史上最全解读》

  • 《Netty 100万级高并发服务器配置》

  • 《Springcloud 高并发 配置 (一文全懂)》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/158129.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

71、AdaNeRF: Adaptive Sampling for Real-time Rendering of Neural Radiance Fields

简介 官网&#xff1a;https://thomasneff.github.io/adanerf/ 新的双网络架构&#xff0c;它采用正交方向&#xff0c;通过学习如何最好地减少所需样本点的数量&#xff0c;将网络分为联合训练的 sample 和 shading 网络&#xff0c;训练方案在每条射线上采用固定的样本位置…

Nginx 高级篇

文章目录Nginx 高级篇一、 负载均衡1、 负载均衡概述2、 处理方式2.1 用户手动选择2.2 DNS 轮询2.3 四 / 七层负载均衡3、 七层负载均衡3.1 七层负载均衡指令3.1.1 upstream3.1.2 server3.2 实现流程3.3 负载均衡的状态3.3.1 down3.3.2 backup3.3.3 max_conns3.3.4 max_fails &…

Docker前世今生

文章目录Docker背景Docker历史docker 理念Docker能做什么虚拟机的缺点容器虚拟化技术Docker学习途径Docker背景 一款产品从开发到上线&#xff0c;从操作系统&#xff0c;到运行环境&#xff0c;再到应用配置。作为开发运维之间的协作我们需要 关心很多东西&#xff0c;这也是…

ChatGPTAPI Key申请教程

ChatGPTAPI Key申请教程 一、API Key申请使用 在浏览器打开网址&#xff1a;https://openai.com/api/ 等待网页加载完成后&#xff0c;点击右上角 LOG IN 进入登录界面 进入登录界面后&#xff0c;依次输入注册的邮箱–Continue–输入密码–Continue&#xff0c;完成登录&…

< CSS小技巧:filter滤镜妙用>

文章目录&#x1f449; 前言&#x1f449; 简述&#x1f449; 基本语法及案例》语法简述》案例&#x1f449; 拓展1. drop-shadow 更加智能的阴影效果2. 网页置灰3. 元素强调、高亮4.节省空间&#xff0c;提高网页加载速度&#x1f449; 具体案例网页参考文献往期内容 &#x1…

如何有效进行团队建设:从关注事到关注人

咱打工人都想趁着年终总结这个契机&#xff0c;拿着工作数据跟领导们提涨薪&#xff01;但是入行没多久的社畜们却没有这个底气&#xff0c;虽累但没结果&#xff08;暗指身兼数职的项目经理小白们&#xff09;&#xff0c;主要是觉得自己的工作成绩不够优秀。这几天办公室的项…

Model-Agnostic Meta-Learning for Fast Adaptation of Deep Networks

摘要 我们为元学习提出了一个算法是模型无关model−agnosticmodel-agnosticmodel−agnostic. 在某种意义上&#xff0c;其与用梯度下降训练的模型是兼容的&#xff0c;可以应用在大量不同的学习问题上。包括&#xff1a;分类、回归、和加强学习。 元学习的目标是正在学习任务…

TAZ生成实践(Intel芯片Mac Python 3.7.9)

参考文章 https://blog.csdn.net/weixin_42632778/article/details/115164518 TAZ生成 https://zhuanlan.zhihu.com/p/343576683 使用ArcGIS实现线转栅格 https://pro.arcgis.com/zh-cn/pro-app/latest/tool-reference/conversion/polyline-to-raster.htm ArcGIS Pro 折线转栅格…

第②篇 Spring IoC——容器

Spring最成功的是其提出的理念&#xff0c;而不是技术本身。 概念 Spring所依赖的两个核心理念&#xff1a; 一个是控制反转&#xff08;IoC&#xff09;。另一个是面向切面编程&#xff08;Aspect Oriented Programming&#xff0c;AOP&#xff09;。 IoC是Spring的核心&am…

JS入门到精通详解(1)

JavaScript概述(需要记)什么是javascript?是一门&#xff08;基于对象&#xff09;和&#xff08;事件驱动&#xff09;的&#xff08;脚本语言&#xff09;。js诞生于哪一年&#xff1f;哪个公司&#xff1f;谁&#xff1f;第一个名字叫什么&#xff1f;1995年 网景 布兰登 l…

【Python】type、isinstance、issubclass详解

type type方法有两种重载形式&#xff1a; type(o: object)&#xff1b;type(name: str, bases:Tuple[type, ...], dict:Mapping[str: Any], **kwds) 使用第一种重载形式的时候&#xff0c;传入一个【object】类型&#xff0c;返回一个【type】对象&#xff0c;通常与object…

解决使用element-plus时使用el-select-v2组件时,选中后无法移除focus的状态的方法。

我们可以使用element-ui-plus的el-select-v2的组件&#xff0c;实现复合搜索和下拉框的功能。 使用如下模块&#xff1a; <template><el-select-v2 v-model"value" filterable :options"options" placeholder"Please select"visibleCha…

爸妈记性变差怎么办?

记不住事的时候&#xff0c;我们总会自嘲“老了&#xff0c;脑子不好使了”。记忆力总是和年龄挂钩的&#xff0c;所以很多子女听到父母这样说&#xff0c;也不会放在心上。但有时&#xff0c;记性变差不一定因为年龄&#xff0c;还有可能是患病的前兆。当父母出现频繁忘事的情…

zerotier虚拟网络配置,局域网与外网如同局域网一样访问。

zerotier:可以搭建用于自己的虚拟网络&#xff0c;经过授权连接成功之后彼此都在同一网段&#xff0c;可以像在局域网一样互相访问。 1.创建zerotier账户 2.创建网络&#xff08;Create A Network&#xff09;并记住网络标识&#xff08;NETWORK ID&#xff09; 一、openwrt设…

中华财险进击数字化

本文来源 / 瞭望 中华联合财产保险股份有限公司&#xff08;下称中华财险&#xff09;&#xff0c;是一家 36 年的老牌国有保险公司&#xff0c;全国营业网点超过 2900 个。近年来&#xff0c;中华财险在业务高速发展的同时&#xff0c;从难啃的硬骨头下手&#xff0c;重构核心…

Unity 之 Addressable可寻址系统 -- 可寻址系统面板介绍 -- 入门(二)

可寻址系统面板介绍 -- 入门&#xff08;二&#xff09;一&#xff0c;可寻址系统目录介绍1.2 创建分组1.2 目录介绍二&#xff0c;可寻址系统设置介绍2.1 Profile - 配置文件2.2 Catalog - 目录2.3 Content Update - 内容更新2.4 Downloads - 下载2.5 Build - 构建2.6 Build a…

【数据结构】5.6 树和森林

文章目录5.6.1 树的存储结构&#xff08;不是二叉树&#xff09;双亲表示法孩子表示法结构定义双亲孩子法孩子兄弟法5.6.2 二叉树的转换树与二叉树的转换将树转换成二叉树将二叉树转换成树森林与二叉树的转换森林转换成二叉树二叉树转换成森林5.6.3 树和森林的遍历树的遍历森林…

Nginx简介

一、什么是Nginx?Nginx是一个高性能的HTTP和反向代理Web服务器&#xff0c;同时也提供IMAP/POP3/SMTP服务。Nginx是一款轻量级的Web服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3/SMTP&#xff09;代理服务器。Nginx的特点是&#xff1a;占有内存少&#xff0c;并发能…

JJWT实现令牌Token

登录实现方式 Session 详情&#xff1a; https://www.cnblogs.com/andy-zhou/p/5360107.html 会话的概念 会话就好比打电话&#xff0c;一次通话可以理解为一次会话。我们登录一个网站&#xff0c;在一个网站上不同的页面浏览&#xff0c;最后退出这个网站&#xff0c;也是…

【Java AWT 图形界面编程】Container 容器 ② ( Frame 窗口示例 | Panel 示例 | 窗口中文乱码处理 )

文章目录一、Frame 窗口示例二、Panel 示例三、窗口中文乱码处理一、Frame 窗口示例 首先 , 创建 Frame 实例对象 , 该对象就是 操作系统中应用软件的 窗口 ; // 1. 创建窗口 Frame frame new Frame("AWT 图形界面编程");Frame 是 Window 的子类 , public class F…