SpringBoot自定义消息总线

news2024/11/19 13:27:03

一、前言

        在现代的分布式系统中,消息传递已成为一个非常流行的模式。它使得系统内的不同部分可以松耦合地通信,从而实现更高效、更可靠的应用程序。本博客将介绍SpringBoot如何提供简单易用的消息传递机制,并展示如何自定义消息总线以满足特定需求。

二、依赖引入

// gradle 自身需求资源库 放头部
buildscript {
    repositories {
        maven { url 'https://maven.aliyun.com/repository/public' }// 加载其他Maven仓库
        mavenCentral()
    }
    dependencies {
        classpath('org.springframework.boot:spring-boot-gradle-plugin:2.1.1.RELEASE')// 加载插件,用到里面的函数方法
    }
}


apply plugin: 'java'
apply plugin: 'idea'
// 使用spring boot 框架
apply plugin: 'org.springframework.boot'
// 使用spring boot的自动依赖管理
apply plugin: 'io.spring.dependency-management'

// 版本信息
group 'com.littledyf'
version '1.0-SNAPSHOT'

// 执行项目中所使用的的资源仓库
repositories {
    maven { url 'https://maven.aliyun.com/repository/public' }
    mavenCentral()
}

// 项目中需要的依赖
dependencies {
    // 添加 jupiter 测试的依赖
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
    // 添加 jupiter 测试的依赖
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

    // 添加 spring-boot-starter-web 的依赖 必须 排除了security 根据自身需求
    implementation('org.springframework.boot:spring-boot-starter-web') {
        exclude group: 'org.springframework.security', module: 'spring-security-config'
    }

    // 添加 spring-boot-starter-test 该依赖对于编译测试是必须的,默认包含编译产品依赖和编译时依赖
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    // 添加 junit 测试的依赖
    testImplementation group: 'junit', name: 'junit', version: '4.11'
    // 添加 lombok
    annotationProcessor 'org.projectlombok:lombok:1.18.22' // annotationProcessor代表main下代码的注解执行器
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'// testAnnotationProcessor代表test下代码的注解执行器
    compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.22' // compile代表编译时使用的lombok



}


test {
    useJUnitPlatform()
}

三、代码

        定义注册器实现类:

import org.springframework.context.ApplicationContext;
import org.springframework.core.GenericTypeResolver;

import java.util.HashMap;
import java.util.Map;

/**
 * @description 注册器
 */
public class Registry {

    /**
     * Query对象和命令提供者的对应关系
     */
    private Map<Class<? extends Query>,QueryProvider> queryProviderMap =  new HashMap<>();

    /**
     * Event对象和命令提供者的对应关系
     */
    private Map<Class<? extends Event>,EventProvider> eventProviderMap =  new HashMap<>();

    public Registry(ApplicationContext applicationContext){
        String[] names = applicationContext.getBeanNamesForType(QueryHandler.class);
        for (String name : names) {
            registerQuery(applicationContext,name);
        }
        names = applicationContext.getBeanNamesForType(EventHandler.class);
        for (String name : names) {
            registerEvent(applicationContext,name);
        }
    }

    private void registerQuery(ApplicationContext applicationContext, String name) {
        Class<QueryHandler<?,?>> handlerClass = (Class<QueryHandler<?,?>>) applicationContext.getType(name);
        Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, QueryHandler.class);
        Class<? extends Query> queryType  = (Class<? extends Query>) generics[1];
        queryProviderMap.put(queryType, new QueryProvider(applicationContext, handlerClass));
    }

    private void registerEvent(ApplicationContext applicationContext, String name) {
        Class<EventHandler<?>> handlerClass = (    Class<EventHandler<?>>) applicationContext.getType(name);
        Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, EventHandler.class);
        Class<? extends Event> eventType  = (Class<? extends Event>) generics[0];
        eventProviderMap.put(eventType, new EventProvider(applicationContext, handlerClass));
    }

    /**
     * 获取具体的QueryHandler   <R, Q extends Query<R>>定义R  Q的类型
     * @param queryClass
     * @param <R>
     * @param <Q>
     * @return
     */
    <R, Q extends Query<R>> QueryHandler<R,Q> getQuery(Class<Q> queryClass) {
        return queryProviderMap.get(queryClass).get();
    }

    /**
     * 获取具体的EventHandler
     * @param eventClass
     * @return
     */
    <E extends Event> EventHandler<E> getEvent(Class<? extends Event> eventClass) {
        return eventProviderMap.get(eventClass).get();
    }
}

        消息总线接口,定义两个方法,一个执行查询,一个执行事件:

/**
 * @description  消息总线
 */
public interface Bus {
    <R,Q extends Query<R>> R executeQuery(Q query);

    <E extends Event> void dispatchEvent(E event);
}

        消息总线实现类:

public class SpringBus implements Bus {

    private final Registry registry;

    public SpringBus(Registry registry) {
        this.registry = registry;
    }


    @Override
    public <R, Q extends Query<R>> R executeQuery(Q query) {
        QueryHandler<R, Q> queryHandler = (QueryHandler<R, Q>) registry.getQuery(query.getClass());
        return queryHandler.handle(query);
    }

    @Override
    public <E extends Event> void dispatchEvent(E event) {
        EventHandler<E> eventHandler = (EventHandler<E>) registry.getEvent(event.getClass());
        eventHandler.process(event);
    }
}

        Query接口:

public interface Query<R> {

}

        QueryHandler接口:

public interface QueryHandler<R, C extends Query<R>> {
    R handle(C query);
}

        QueryProvider类:

import org.springframework.context.ApplicationContext;

/**
 * query  提供者
 * @param <H>
 */
public class QueryProvider<H extends QueryHandler<?, ?>> {
    private final ApplicationContext applicationContext;
    private final Class<H> type;

    QueryProvider(ApplicationContext applicationContext, Class<H> type) {
        this.applicationContext = applicationContext;
        this.type = type;
    }

    public H get() {
        return applicationContext.getBean(type);
    }
}

        Event类似,Event接口:

public interface Event {

}

        EventHandler接口:

/**
 * @description  事件处理器
 */
public interface EventHandler<E extends Event> {
    /**
     *
     * @param event  事件
     */
    void process(E event);
}

        EventProvider类:

import org.springframework.context.ApplicationContext;

/**
 * event  提供者
 * @param <H>
 */
public class EventProvider<H extends EventHandler<?>> {
    private final ApplicationContext applicationContext;
    private final Class<H> type;

    EventProvider(ApplicationContext applicationContext, Class<H> type) {
        this.applicationContext = applicationContext;
        this.type = type;
    }

    public H get() {
        return applicationContext.getBean(type);
    }
}

        实体类:

import com.littledyf.cqs.Query;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class TestDto implements Serializable, Query<List<TestVo>> {
    private String name;
}
import lombok.Data;

@Data
public class TestVo {

    private String nameVo;
}

        Query具体实现类:

import com.littledyf.cqs.QueryHandler;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
@NoArgsConstructor
public class TestQueryHandler implements QueryHandler<List<TestVo>, TestDto> {

    @Override
    public List<TestVo> handle(TestDto testDto) {

        List<TestVo> testVos = new ArrayList<>();
        TestVo testVo = new TestVo();
        testVo.setNameVo(testDto.getName());

        testVos.add(testVo);
        return testVos;
    }
}

        Controller层:

import com.littledyf.cqs.Bus;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@RestController
@RequestMapping("/my-test/cqs")
public class CqsController {

    @Resource
    private Bus bus;

    @PostMapping(value = "/query-test")
    public List<TestVo> queryTest(@RequestBody TestDto testDto)  {
        return bus.executeQuery(testDto);
    }
}

        SpringBoot启动类,启动类中进行ApplicationContext的注入:

import com.littledyf.cqs.Bus;
import com.littledyf.cqs.Registry;
import com.littledyf.cqs.SpringBus;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MyTestApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyTestApplication.class, args);
	}

	/**
	 * 注册器
	 */
	@Bean
	public Registry registry(ApplicationContext applicationContext) {
		return new Registry(applicationContext);
	}

	/**
	 * 消息总线
	 */
	@Bean
	public Bus commandBus(Registry registry) {
		return new SpringBus(registry);
	}
}

        yml文件配置:

server:
  port: 8080
spring:
  application:
    name: my-test-service

四、测试

        这里只要模拟了查询,事件等与查询类似,需要实现具体的接口。整体实现就是在SpringBoot启动时加载注册类,注册类会根据具体的类注入相应的bean,在具体调用时,会根据不同的类实现调用相关的bean。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/962066.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安装使用 d3graph 时出现 TypeError 的解决方法

使用 python 3.7 pip 22.3.1 在清华镜像源 https://pypi.tuna.tsinghua.edu.cn/simple 安装 d3blocks 1.3.2 时&#xff0c;安装成功后导入包时出错&#xff1a; 观察报错信息可以看到出错的代码&#xff08;902 行&#xff09;使用了类型指定语法&#xff0c;这是最新的 pyth…

stable diffusion实践操作-电脑硬件查看

本文专门开一节写电脑硬件相关的内容&#xff0c;在看之前&#xff0c;可以同步关注&#xff1a; stable diffusion实践操作 正文 1、检查电脑显存的方法&#xff08;win10&#xff09;&#xff1a; 鼠标放在工具栏&#xff0c;单击右键打开“任务管理器”&#xff0c;选择顶…

jmeter 固定定时器

固定定时器&#xff08;Constant Timer&#xff09;是一个定时器元件&#xff0c;可以在线程组中的每个线程之间添加固定的延迟时间。固定定时器会对每个线程的执行进行一定的暂停。 聊一下和线程组中的调度器对线程组执行时长的影响&#xff1a; 相同&#xff1a; 都会影响线…

【线性代数】矩阵求导的本质与分子布局、分母布局的本质(矩阵求导——本质篇)

矩阵求导的本质与分子布局、分母布局的本质&#xff08;矩阵求导——本质篇&#xff09; 说在前面一. 函数与标量、向量、矩阵二. 矩阵求导的本质三. 矩阵求导结果的布局四. 分子布局、分母布局的本质五. 向量变元的实值标量函数 说在前面 我将严谨地说明矩阵求导的本质与分子布…

游戏思考30(补充版):关于逆水寒铁牢关副本、白石副本和技能的一些注释(2023/0902)

前期介绍 我是一名逆水寒的玩家&#xff0c;做一些游戏的笔记当作攻略记录下来&#xff0c;荣光不朽-帝霸来源视频连接 传送门 一、旧版铁牢关&#xff08;非逆水寒老兵服&#xff09; &#xff08;1&#xff09;老一&#xff1a;巨鹰 1&#xff09;机制一&#xff1a;三阵风…

多通道振弦数据记录仪应用桥梁安全监测的关键要点

多通道振弦数据记录仪应用桥梁安全监测的关键要点 随着近年来桥梁建设和维护的不断推进&#xff0c;桥梁安全监测越来越成为公共关注的焦点。多通道振弦数据记录仪因其高效、准确的数据采集和处理能力&#xff0c;已经成为桥梁安全监测中不可或缺的设备。本文将从以下几个方面…

JavaScript基础02

JavaScript 基础 文章目录 JavaScript 基础运算符算术运算符赋值运算符自增/自减运算符比较运算符逻辑运算符运算符优先级 语句表达式和语句分支语句if 分支语句if双分支语句if 多分支语句三元运算符&#xff08;三元表达式&#xff09;switch语句&#xff08;了解&#xff09;…

孙哥Spring源码第16集

第16集 refresh()-prepareBeanFactory分析 【视频来源于&#xff1a;B站up主孙帅suns Spring源码视频】 1、设置类加载器 2、设置SpringEL表达式 解析器 3、设置内置的属性编辑器 &#xff08;类型转换器&#xff09; 3.1、如何实现类型转化&#xff1f; 1、Converter 2、P…

Ubuntu 20.04 Server配置网络

0&#xff0c;环境 服务器&#xff1a; Intel(R) Xeon(R) Gold 6248R CPU 3.00GHz 96核 网卡&#xff1a; 多网卡 1&#xff0c; 镜像下载 http://old-releases.ubuntu.com/releases/ubuntu-20.04.1-desktop-amd64.iso 2&#xff0c; 系统安装--具体步骤就不贴出来&#…

Navicat 强大的数据模型功能 | 面向数据库设计、架构和数据资产梳理等使用场景

数据模型是用来描述数据、组织数据和对数据进行操作的一组概念和定义。根据不同的应用需求&#xff0c;数据模型可以分为概念模型、逻辑模型和物理模型。这些数据模型帮助数据库设计人员设计和管理数据库&#xff0c;以满足用户的需求。 Navicat 强大的数据模型功能主要适用于…

Linux c++开发-03-使用CMake组织工程

一、简单文件的编译 有如下的目录结构&#xff1a; 其中 helloworld.cpp如下&#xff1a; #include <iostream> using namespace std; int main() {printf("hello world my name is Ty!");return 0; }CMakeLists.txt如下&#xff1a; cmake_minimum_requir…

cocos creator配置终端调试

在launch.json里添加"preLaunchTask":“CocosCreator compile” 在cocos creator里选择开发者&#xff0c;visual studio code工作流&#xff0c;选择添加编译任务。 添加 settings.json {"files.exclude":{"**/.git": true,"**/.DS_Sto…

数据结构与算法(三)线性表

线性表定义 线性表&#xff08;List&#xff09;&#xff1a;零个或多个数据元素的有限序列。 首先它是一个序列&#xff0c;其次&#xff0c;线性表强调是有限的。 前驱元素&#xff1a;若A元素在B元素的前面&#xff0c;则称A为B的前驱元素 后继元素&#xff1a;若B元素在…

【代码随想录day23】不同路径

题目 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xff09;。 问总共有多少条不同的路径&#xff1f; 示…

明茨伯格的人际关系角色理论

明茨伯格的人际关系角色理论是由社会心理学家明茨伯格&#xff08;William Schutz&#xff09;在20世纪50年代提出的一种关于人际关系的理论。该理论主要探讨了人际关系中的三个角色&#xff1a;包容性、控制性和亲密性。这些角色代表了人们在互动中所表现出的行为和需求。下面…

ClickHouse进阶(六):副本与分片-2-Distributed引擎

进入正文前&#xff0c;感谢宝子们订阅专题、点赞、评论、收藏&#xff01;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; &#x1f3e1;个人主页&#xff1a;含各种IT体系技术,IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &#x1f4cc;订阅…

Qt各个版本下载及安装教程(离线和非离线安装)

Qt各个版本下载链接&#xff1a; Index of /archive/qthttps://download.qt.io/archive/qt/ 离线安装 &#xff0c;离线安装很无脑&#xff0c;下一步下一步就可以。 我离线下载 半个小时把2G的exe下载下来了

使用boost::geometry::union_ 合并边界(内、外)- 方案一

使用boost::geometry::union_ 合并边界&#xff08;内、外&#xff09;&#xff1a;方案一 结合 boost::geometry::read_wkt() 函数 #include <iostream> #include <vector>#include <boost/geometry.hpp> #include <boost/geometry/geometries/point_x…

linux C编程 获取系统时间

1.clock_gettime #include<time.h> int clock_gettime(clockid_t clk_id,struct timespec *tp); struct timespec {time_t tv_sec; /* 秒*/long tv_nsec; /* 纳秒*/ }clk_id : CLOCK_BOOTTIME&#xff0c;以系统启动时间为时间原点的时间体系&#xff0c;不受其它因素的…

PowerQuery动态加载M公式

Power Query 是Excel中的强大数据处理与转换工具&#xff0c;如果需要“动态”处理数据&#xff0c;大家第一时间想到的是可以使用VBA&#xff0c;利用代码创建M公式&#xff0c;进而创建PQ查询&#xff0c;但是复杂的M公式可能有很多行&#xff0c; 使用VBA处理起来并不是很方…