如何创造一个属于自己的springboot stater

news2025/1/18 20:27:56

如何创造一个属于自己的springboot stater

  • 什么是stater
    • stater是怎么实现注入进来的
    • 如何进行约定
  • 基于上述理论的demo
    • 实现功能
    • 代码目录
    • 核心实现
      • spring.factories
      • SpringMessageSubscribe(扫描所有@Subscribe注解生成消息订阅)
      • 基于Redis的消息订阅
      • 基于redis的消息发布
      • 针对多个相同topic的订阅者进行消息多播

什么是stater

stater是一种特殊的spring boot 工程,它实现了一些共同性的功能,使你可以依赖过来直接使用,又在配置上做出了一些默认的约定,是你不需要进行复杂的配置。

stater是怎么实现注入进来的

有spring boot基础知识的同学都知道,当boot工程启动时默认是扫描本包及其子孙包下的class,看其是否被注解,是否要纳入spring 容器进行管理,那么stater的包名肯定是和你的业务包名不一致的那么他是怎么实现上述配置的呢,也就是spring boot 约定大于配置的原理是什么呢?
秘密是在于spring.factories文件。spring容器启动时会读取META-INF/spring.factories文件内容,将其中的类进行实例化并放入容器进行管理。下面是一个例子

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.soft863.stream.core.eventbus.EventBus,\
  com.soft863.stream.core.eventbus.subscribe.SpringMessageSubscribe,\
  com.soft863.stream.core.eventbus.config.RedisConfig,\
  com.soft863.stream.core.eventbus.subscribe.redis.RedisMessageBroker,\
  com.soft863.stream.core.eventbus.publish.redis.RedisMessagePublisher

按照例子中所写,及时stater中的这些类并不在启动工程的application下面,也是可以扫面到进行实例化管理的。

如何进行约定

在一些共通性的功能中经常会出现不一样的分支处理,比如数据库可能有mysql,oracle等区别,比如我想实现一个MQ消息通信那么也可能有rabbitmq rocketmq这些不通的实现,那么我怎么能根据用户的配置进行不同的实例化,或者说我使用一个默认的功能实现呢,那么就需要用一个很重要的注解@ConditionalOnProperty

举一个例子,我想做一个消息总线,其中通信组件我有几个选型比如redis的stream,rabbitmq rocketmq。基于轻量化的考虑,我想让其默认为redis实现,那么就可以这样写

@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)

这一块代码的意思如下

  • prefix 指的是yaml文件中配置前缀
  • name 则指的是配置的名称
  • havingValue 指name这个配置的值是否包含该值
  • matchIfMissing 如果没有name这个配置,这个注解是否生效

综合下来就是查看配置文件中是否有stream.broker.type这个参数,如果有看他是否是redis如果是redis则实例化这个bean,如果没有也同样实例化(当然了这个类必须被
@Component所注解),只有存在这个参数且不是redis的时候才不实例化。这样就可以实现默认redis的功能了。
具有同样功能的注解还有以下,可以根据名称大致了解其中的含义
在这里插入图片描述

基于上述理论的demo

实现功能

默认基于redis stream进行发布订阅模式的事件处理消息总线。使用过程中只需在对应方法上加上@Subscribe注解即可订阅消息,不需要自己写其他的处理代码

代码目录

在这里插入图片描述

核心实现

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.soft863.stream.core.eventbus.EventBus,\
  com.soft863.stream.core.eventbus.subscribe.SpringMessageSubscribe,\
  com.soft863.stream.core.eventbus.config.RedisConfig,\
  com.soft863.stream.core.eventbus.subscribe.redis.RedisMessageBroker,\
  com.soft863.stream.core.eventbus.publish.redis.RedisMessagePublisher

SpringMessageSubscribe(扫描所有@Subscribe注解生成消息订阅)

package com.soft863.stream.core.eventbus.subscribe;

import com.soft863.stream.core.eventbus.EventBus;
import com.soft863.stream.core.annotation.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.stereotype.Component;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;

@Component
@Slf4j
public class SpringMessageSubscribe implements BeanPostProcessor {

    private final EventBus eventBus;

    private final MessageAdapterCrater messageAdapterCrater = new DefaultMessageAdapterCrater();

    public SpringMessageSubscribe(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    /**
     * 为注解添加监听处理
     *
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> type = ClassUtils.getUserClass(bean);

        ReflectionUtils.doWithMethods(type, method -> {
            // 取得所有订阅方法
            AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class);
            if (CollectionUtils.isEmpty(subscribes)) {
                return;
            }
            // 生成订阅Adapter
            MessageAdapter sub = messageAdapterCrater.createMessageAdapter(type, method, subscribes.getString("topic"));
            // 注册订阅Adapter
            eventBus.addAdapter(sub, subscribes.getString("topic"));
        });
        return bean;
    }
}

基于Redis的消息订阅

package com.soft863.stream.core.eventbus.subscribe.redis;

import com.soft863.stream.core.eventbus.EventBus;
import com.soft863.stream.core.eventbus.subscribe.EventMessageBroker;
import com.soft863.stream.core.eventbus.message.AdapterMessage;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Collections;

/**
 * 消息总线-集群间消息redis实现
 *
 * 实现规则
 *     根据不同的MQ机制实现消息的分组消费,将原始消息转化为AdapterMessage,然后使用继承于EventMessageBroker进行消息多播
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)
public class RedisMessageBroker  extends EventMessageBroker implements StreamListener<String, MapRecord<String, String, String>> {

    @Value("${stream.topic:/stream}")
    String topic;

    @Value("${stream.consumer.id}")
    String consumerId;

    @Value("${stream.timeout:10}")
    Integer timeout;

    @Value("${stream.consumer.group}")
    String groupName = "stream.redis";

    private final RedisTemplate<String, String> streamRedisTemplate;
    private final ApplicationContext applicationContext;
    private final EventBus eventBus;

    public RedisMessageBroker(RedisTemplate<String, String> streamRedisTemplate, ApplicationContext applicationContext, EventBus eventBus) {
        this.streamRedisTemplate = streamRedisTemplate;
        this.applicationContext = applicationContext;
        this.eventBus = eventBus;
    }


    @SneakyThrows
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        log.info("消息内容-->{}", message.getValue());
        StreamOperations<String, String, String> streamOperations = streamRedisTemplate.opsForStream();

        // 服务内消息多播
        AdapterMessage adapterMessage = new AdapterMessage();
        adapterMessage.setTopic(message.getValue().get("topic"));
        adapterMessage.setPayload(message.getValue().get("payload"));
        try {
            this.multicastEvent(adapterMessage);
        } catch (Exception e) {
            log.info("消息多播失败:" + e.getLocalizedMessage());
        }
        //消息应答
        streamOperations.acknowledge(topic, groupName, message.getId());
    }


    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> emailListenerContainerOptions() {

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block读取超时时间
                .pollTimeout(Duration.ofSeconds(timeout))
                //count 数量(一次只获取一条消息)
                .batchSize(1)
                //序列化规则
                .serializer(stringRedisSerializer)
                .build();
    }

    /**
     * 开启监听器接收消息
     */
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                                                            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions) {
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 创建 stream 流
        if (!streamRedisTemplate.hasKey(topic)) {
            streamRedisTemplate.opsForStream().add(topic, Collections.singletonMap("", ""));
            log.info("初始化集群间通信Topic{} success", topic);
        }

        //创建消费者组
        try {
            streamRedisTemplate.opsForStream().createGroup(topic, groupName);
        } catch (Exception e) {
            log.info("消费者组 {} 已存在", groupName);
        }

        //注册消费者 消费者名称,从哪条消息开始消费,消费者类
        // > 表示没消费过的消息
        // $ 表示最新的消息
        listenerContainer.receive(
                Consumer.from(groupName, consumerId),
                StreamOffset.create(topic, ReadOffset.lastConsumed()),
                this
        );

        listenerContainer.start();
        return listenerContainer;
    }

    @Override
    public void multicastEvent(AdapterMessage message) throws IllegalAccessException, InstantiationException, InvocationTargetException, IOException {
        super.doMulticastEvent(message, eventBus.getSubscribesPool(), applicationContext);
    }
}

基于redis的消息发布

package com.soft863.stream.core.eventbus.publish.redis;

import com.soft863.stream.core.eventbus.message.AdapterMessage;
import com.soft863.stream.core.eventbus.publish.MessagePublisher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 消息总线发布Redis实现
 */
@Component
@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)
public class RedisMessagePublisher implements MessagePublisher {

    @Value("${stream.topic:/stream}")
    String topic;

    private final RedisConnectionFactory connectionFactory;

    public RedisMessagePublisher(RedisConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Override
    public Boolean publish(AdapterMessage message) {
        Map value = new HashMap();
        value.put("topic".getBytes(), message.getTopic().getBytes());
        value.put("payload".getBytes(), message.getPayload().getBytes());
        ByteRecord byteRecord = StreamRecords.rawBytes(value).withStreamKey(topic.getBytes());
        // 刚追加记录的记录ID
        RecordId recordId = connectionFactory.getConnection().xAdd(byteRecord);
        return true;
    }
}

针对多个相同topic的订阅者进行消息多播

package com.soft863.stream.core.eventbus.subscribe;

import com.alibaba.fastjson.JSON;
import com.soft863.stream.core.eventbus.message.AdapterMessage;
import com.soft863.stream.core.util.TopicMatcher;
import com.soft863.stream.core.util.TopicUtil;
import org.springframework.context.ApplicationContext;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;

public abstract class EventMessageBroker {

    /**
     * 消息多播
     *
     * 将集群间消息在自己服务内部广播,是的所有订阅消息都可以收到
     *
     * @param message
     * @return
     */
    public abstract void multicastEvent(AdapterMessage message) throws IllegalAccessException, InstantiationException, InvocationTargetException, IOException;

    public void doMulticastEvent(AdapterMessage message, Map<TopicMatcher, Map<String, MessageAdapter>> subscribesPool, ApplicationContext applicationContext) throws InvocationTargetException, IllegalAccessException, InstantiationException, IOException {
        // 查找相匹配的Adapter
        List<MessageAdapter> adapterList = TopicUtil.getAllMatchedAdapter(message.getTopic(), subscribesPool);

        for (MessageAdapter adapter : adapterList) {
            if (adapter != null && adapter.isActive()) {
                // 手动订阅消息
                if (adapter.getCustomer() != null) {
                    adapter.getCustomer().consume(message);
                } else {
                    // 取得对象
                    Object instance = applicationContext.getBean(adapter.getClazz());
                    if (instance != null) {
                        // 将消息转化为所需要的类型
                        if (adapter.getMethod().getParameterCount() > 0) {
                            Class<?> param = adapter.getMethod().getParameterTypes()[0];
                            adapter.getMethod().invoke(instance, JSON.parseObject(message.getPayload(), param));
                        } else {
                            adapter.getMethod().invoke(instance);
                        }

                    }
                }
            }

        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/554617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对封装好的Vue组件库进行打包,并发布到npm上

1. 新建vue 项目 并且在根目录创建两个文件夹 packages和examples。 packages&#xff1a;用于存放所有的组件 examples&#xff1a;用于进行测试组件&#xff0c;把src改为examples 2.配置vue.config.js 并设置入口文件 如果没有vue.config.js文件 就需要在项目根目录下创…

数说故事@FBIC丨首发食饮SMI社媒心智品牌榜,为品牌支招紧跟健康新风尚

第八届Foodaily创博会&#xff08;FBIC全球食品饮料创新大会&#xff09;于5月14-16日在上海跨国采购会展中心圆满落幕&#xff0c;呈现了一场食品饮料行业盛会。数说故事与众多食饮健康品牌一起&#xff0c;走过了一段大数据AI加持的创新之旅。 数说故事VP孙淑娟Jessie受邀分享…

Android APP 集成系统签名

由于android 系统权限限制&#xff0c;很多时候普通APP权限无法完成&#xff0c;需要系统APP才有足够的权限&#xff0c; 比如&#xff1a;安装、卸载应用&#xff0c;重启设备&#xff0c;恢复出厂设置&#xff0c;以及设置里面的一些功能&#xff0c;都是需要系统权限才能调…

【WLAN网络故障,带你搞定它!】

01 无线网卡搜索不到 AP的无线信号 01 问题现象 无线网卡搜索不到 AP 的无线信号 02 问题分析 无线网卡搜索不到 AP 的信号 ,原因可以从两方面着手&#xff1a; 1.无线网卡 AP本身 在遇到该问题的时候&#xff0c;我们可以从以上两个方面进行处理。 03 处理过程 1.无线…

Python GUI编程:使用wxPython处理长文本

这段代码的应用场景有&#xff1a; 在文本编辑器和IDE等应用程序中&#xff0c;可以使用这个示例代码来处理长文本&#xff0c;以便用户更好地查看和编辑文本。在数据分析和科学计算等领域中&#xff0c;可以使用这个示例代码来显示和处理大量的数据和结果。在日志分析和系统监…

解决方案 TestCenter自动测试软件平台

方案概述 TestCenter是一个专为加速您的测试系统软件开发而设计的自动测试系统软件平台&#xff0c;主要应用于测试程序的开发、运行和管理。TestCenter实现了对测试资源管理、测试程序开发与调试、测试数据管理以及测试程序发布等功能的无缝集成和统一部署&#xff0c;这将帮…

Google I/O 2023 推出Flutter 3.10 快来看看都有哪些变化

本文首发自[慕课网] &#xff0c;想了解更多IT干货内容&#xff0c;程序员圈内热闻&#xff0c;欢迎关注"慕课网"及“慕课网公众号”&#xff01; 作者&#xff1a; CrazyCodeBoy |慕课网名师 今年的Google I/O满满的 AI与狠活&#xff0c;而且还推出 Flutter 3…

Flutter一天一控件之ListTile(列表的实现)

ListTile简介 Flutter中的ListTile控件是一种常用的列表项控件&#xff0c;它可以用于显示列表中的每一个项&#xff0c;通常包含标题、副标题、图标等内容。ListTile控件的外观和行为类似于Android中的ListView中的列表项。 一个简单的ListTile示例&#xff1a; ListTile(l…

大流量卡介绍:网上的大流量卡都是怎么来的?

大流量卡介绍&#xff0c;你知道网上的大流量卡都是怎么来的&#xff1f; 其实&#xff0c;网上29元155G、39元180G的优惠套餐&#xff0c;本身都是我们常见的流量卡如电信星卡、联通王卡、移动花卡等等&#xff0c;之所以这么便宜&#xff0c;只不过运营商在这些套餐上面增加…

【新星计划】数据库 排名函数 初识

数据库 排名函数 初识 查询排序初识排名函数row_number()rank()dense_rank()ntile()percent_rank() 开窗函数为聚合函数使用开窗函数 小结 查询排序 在日常工作中&#xff0c;我们对所有需要的数据都会进行一个排序操作&#xff0c;以获得我们最需要的数据。 排序指令 order …

Unreal Niagara粒子入门1

记录下学习Niagara粒子的过程&#xff0c;这次调的是比较简单的一个效果&#xff1a; 使用了随粒子生命的缩放、打开速率解算、基本的发射器和Niagara容器。 1.创建Niagara Niagara中&#xff0c;发射器和NiagaraSystem文件是可以分开创建的&#xff1a; 通常直接点Niagara…

GPT-2(Transformer Decoder)的TensorFlow实现(附源码)

文章目录 一、GPT2实现步骤二、源码 一、GPT2实现步骤 机器学习模型的开发实现步骤一般都包含以下几个部分&#xff1a;   1. 遵照模型的网络架构&#xff0c;实现每一层&#xff08;Layer/Block&#xff09;的函数&#xff1b;   2. 将第1步中的函数组合在一起&#xff0c…

微信小程序nodejs+vue校园快递代拿系统uniapp校园互助系统

语言 node.js 框架&#xff1a;Express 前端:Vue.js 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat 开发软件&#xff1a;VScode 平台旨在解决目前大学生找人帮忙&#xff0c;难&#xff0c;慢&#xff0c;不可靠以及想兼职同学找不到好的平台的问题。对于招人帮忙的…

应急演练脚本编写的几个步骤

应急演练是一项非常重要的活动&#xff0c;对于保障企业的安全和稳定运行至关重要。而一个完整的应急演练需要编写一个详细的脚本来指导演练过程。以下是应急演练脚本编写的几个步骤。 定义演练场景 首先&#xff0c;需要定义演练场景&#xff0c;这将决定演练的目标和方向。在…

美国原装二手 SR560 低噪声电压前置放大器

Stanford Research SR560低噪声电压前置放大器 ​Stanford Research SR560 是一款高性能、低噪声前置放大器&#xff0c;适用于各种应用&#xff0c;包括低温测量、光学检测和音频工程。 SR560 具有一个具有 4 nV/√Hz 输入噪声和 100 MΩ 输入阻抗的差分前端。完整的噪声系数…

三招教你图片文字转语音怎么转

随着数字化时代的到来&#xff0c;人们对于数字信息的获取和处理需求越来越大&#xff0c;而图片文字转语音技术正是为了满足这一需求而诞生的。这项技术不仅可以辅助视力障碍者&#xff0c;让他们能更轻松地获取信息和理解内容&#xff0c;而且也可以帮助正在学习外语的人们练…

Menards EDI对接流程

Menards是一家美国的家居建材零售商&#xff0c;成立于1962年&#xff0c;总部位于美国威斯康星州的伊甸谷市。该公司经营各种家居建材产品&#xff0c;包括木材、地板、墙纸、厨房卫浴用品等&#xff0c;并拥有超过300家门店&#xff0c;分布在美国中西部和北部地区。Menards的…

2023智能座舱新趋势洞察

两年一度的上海车展于4月底正式落幕&#xff0c;怿星科技市场总监老崔率团队奔赴考察&#xff0c;经过多日分析整理&#xff0c;围绕车展发布车型为核心&#xff0c;制作了怿星科技2023智能座舱趋势洞察报告&#xff0c;现将报告分享如下。 01 车展简述 汽车行业进入新时代 本…

DVB-S中卫星通信系统的基带仿真(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 ​数字视频广播(DVB)在卫星通信数字多媒体业务领域应用广泛,其一般采用MPEG-2编码、数字传输和纠错处理等通用技术,然而,当第三方…

LeetCode 117. 填充每个节点的下一个右侧节点指针 II

117. 填充每个节点的下一个右侧节点指针 II 描述 给定一个二叉树&#xff1a; struct Node {int val;Node *left;Node *right;Node *next; }填充它的每个 next 指针&#xff0c;让这个指针指向其下一个右侧节点。如果找不到下一个右侧节点&#xff0c;则将 next 指针设置为 …