从零开始搭建属于自己的物联网平台(二)实现基于订阅发布的消息总线

news2025/1/11 6:04:24

实现基于订阅发布的消息总线

  • 往期链接
  • 实现的功能及形式
  • 功能设计及代码实现
    • 生产者
      • 使用redis实现生产者
    • 消费者
      • 使用redis实现消费者
    • 配套@Subscribe注解
      • 实现BeanPostProcessor监听所有的bean创建
    • EventBus对象

往期链接

从零开始搭建属于自己的物联网平台(一)需求分析以及架构设计

实现的功能及形式

  • 以stater方式实现
  • 支持市面上常见的发布订阅的三方组件,比如各种MQ,redis stream等
  • 提供注解,快速实现订阅,同样支持代码来动态添加订阅

功能设计及代码实现

首先,像这种平台的功能,一定要做好抽象设计,预留好功能扩展的接口

处理流程图
在这里插入图片描述

这里生产者处理逻辑比较简单,当设备消息或者其他业务的消息产生的时候,调用生产者实现的发布方法将消息推送至消息总线。
消费者这里处理就稍微复杂一些,首先在消费者服务启动时需要先扫描所有的服务内的订阅者(一个@Subscribe修饰的方法就是一个订阅者,为了防止混淆这里将这些订阅者称为MessageAdapter),将这些Adapter进行注册,并根据订阅的topic做好分组(有可能会有多个Adapter订阅同一个topic),消费者持续监听消息总线,如果有是多个实例那么做好分组,当订阅的消息到来之后,首先做消息的解析,解析出内部消息的topic,再从内存中取得对应的Adapter做服务内部的消息多播(这里其实就是仿照的spring event的处理逻辑),Adapter接收到多播消息之后再进行对应的业务处理。

生产者

定义生产者抽象接口,以后兼容的第三方组件只要实现该接口就好。

/**
 * 消息总线代码发布
 */
public interface MessagePublisher {

    /**
     * 消息发布
     *
     * @param message 消息体
     * @return 发布结果
     */
    Boolean publish(AdapterMessage message);
}

使用redis实现生产者

这里使用@ConditionalOnProperty来修饰该实现类,这样目的是让redis stream作为消息总线的默认实现,如果后续添加别的实现,可以在配置文件中便利的配置。

/**
 * 消息总线发布Redis实现
 */
@Component
@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)
public class RedisMessagePublisher implements MessagePublisher {

    @Value("${stream.topic:/stream}")
    String topic;

    private final RedisConnectionFactory connectionFactory;

    public RedisMessagePublisher(RedisConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Override
    public Boolean publish(AdapterMessage message) {
        Map value = new HashMap();
        value.put("topic".getBytes(), message.getTopic().getBytes());
        value.put("payload".getBytes(), message.getPayload().getBytes());
        ByteRecord byteRecord = StreamRecords.rawBytes(value).withStreamKey(topic.getBytes());
        // 刚追加记录的记录ID
        RecordId recordId = connectionFactory.getConnection().xAdd(byteRecord);
        return true;
    }
}

消费者

定义消费者抽象接口,这个接口是为了后续手动订阅准备的。

/**
 * 手动订阅消息总线消费者
 */
public interface DynamicSubscribeCustomer {

    String consume(AdapterMessage message) throws IOException;
}

订阅者组内多播处理抽象类

public abstract class EventMessageBroker {

    /**
     * 消息多播
     *
     * 将集群间消息在自己服务内部广播,是的所有订阅消息都可以收到
     *
     * @param message
     * @return
     */
    public abstract void multicastEvent(AdapterMessage message) throws IllegalAccessException, InstantiationException, InvocationTargetException, IOException;

    public void doMulticastEvent(AdapterMessage message, Map<TopicMatcher, Map<String, MessageAdapter>> subscribesPool, ApplicationContext applicationContext) throws InvocationTargetException, IllegalAccessException, InstantiationException, IOException {
        // 查找相匹配的Adapter
        List<MessageAdapter> adapterList = TopicUtil.getAllMatchedAdapter(message.getTopic(), subscribesPool);

        for (MessageAdapter adapter : adapterList) {
            if (adapter != null && adapter.isActive()) {
                // 手动订阅消息
                if (adapter.getCustomer() != null) {
                    adapter.getCustomer().consume(message);
                } else {
                    // 取得对象
                    Object instance = applicationContext.getBean(adapter.getClazz());
                    if (instance != null) {
                        // 将消息转化为所需要的类型
                        if (adapter.getMethod().getParameterCount() > 0) {
                            Class<?> param = adapter.getMethod().getParameterTypes()[0];
                            adapter.getMethod().invoke(instance, JSON.parseObject(message.getPayload(), param));
                        } else {
                            adapter.getMethod().invoke(instance);
                        }

                    }
                }
            }

        }

    }
}

使用redis实现消费者

/**
 * 消息总线-集群间消息redis实现
 *
 * 实现规则
 *     根据不同的MQ机制实现消息的分组消费,将原始消息转化为AdapterMessage,然后使用继承于EventMessageBroker进行消息多播
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)
public class RedisMessageBroker  extends EventMessageBroker implements StreamListener<String, MapRecord<String, String, String>> {

    @Value("${stream.topic:/stream}")
    String topic;

    @Value("${stream.consumer.id}")
    String consumerId;

    @Value("${stream.timeout:10}")
    Integer timeout;

    @Value("${stream.consumer.group}")
    String groupName = "stream.redis";

    private final RedisTemplate<String, String> streamRedisTemplate;
    private final ApplicationContext applicationContext;
    private final EventBus eventBus;

    public RedisMessageBroker(RedisTemplate<String, String> streamRedisTemplate, ApplicationContext applicationContext, EventBus eventBus) {
        this.streamRedisTemplate = streamRedisTemplate;
        this.applicationContext = applicationContext;
        this.eventBus = eventBus;
    }


    @SneakyThrows
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        log.info("消息内容-->{}", message.getValue());
        StreamOperations<String, String, String> streamOperations = streamRedisTemplate.opsForStream();

        // 服务内消息多播
        AdapterMessage adapterMessage = new AdapterMessage();
        adapterMessage.setTopic(message.getValue().get("topic"));
        adapterMessage.setPayload(message.getValue().get("payload"));
        try {
            this.multicastEvent(adapterMessage);
        } catch (Exception e) {
            log.info("消息多播失败:" + e.getLocalizedMessage());
        }
        //消息应答
        streamOperations.acknowledge(topic, groupName, message.getId());
    }


    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> emailListenerContainerOptions() {

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block读取超时时间
                .pollTimeout(Duration.ofSeconds(timeout))
                //count 数量(一次只获取一条消息)
                .batchSize(1)
                //序列化规则
                .serializer(stringRedisSerializer)
                .build();
    }

    /**
     * 开启监听器接收消息
     */
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                                                            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions) {
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 创建 stream 流
        if (!streamRedisTemplate.hasKey(topic)) {
            streamRedisTemplate.opsForStream().add(topic, Collections.singletonMap("", ""));
            log.info("初始化集群间通信Topic{} success", topic);
        }

        //创建消费者组
        try {
            streamRedisTemplate.opsForStream().createGroup(topic, groupName);
        } catch (Exception e) {
            log.info("消费者组 {} 已存在", groupName);
        }

        //注册消费者 消费者名称,从哪条消息开始消费,消费者类
        // > 表示没消费过的消息
        // $ 表示最新的消息
        listenerContainer.receive(
                Consumer.from(groupName, consumerId),
                StreamOffset.create(topic, ReadOffset.lastConsumed()),
                this
        );

        listenerContainer.start();
        return listenerContainer;
    }

    @Override
    public void multicastEvent(AdapterMessage message) throws IllegalAccessException, InstantiationException, InvocationTargetException, IOException {
        super.doMulticastEvent(message, eventBus.getSubscribesPool(), applicationContext);
    }
}

配套@Subscribe注解

/**
 * 自定义消息订阅
 *
 * @author liuhl
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@SuppressWarnings("unused")
public @interface Subscribe {

    /**
     * 订阅topic
     * @return
     */
    @AliasFor("value")
    String topic() default "";

    @AliasFor("topic")
    String value() default "";

}

实现BeanPostProcessor监听所有的bean创建

@Component
@Slf4j
public class SpringMessageSubscribe implements BeanPostProcessor {

    private final EventBus eventBus;

    private final MessageAdapterCrater messageAdapterCrater = new DefaultMessageAdapterCrater();

    public SpringMessageSubscribe(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    /**
     * 为注解添加监听处理
     *
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> type = ClassUtils.getUserClass(bean);

        ReflectionUtils.doWithMethods(type, method -> {
            // 取得所有订阅方法
            AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class);
            if (CollectionUtils.isEmpty(subscribes)) {
                return;
            }
            // 生成订阅Adapter
            MessageAdapter sub = messageAdapterCrater.createMessageAdapter(type, method, subscribes.getString("topic"));
            // 注册订阅Adapter
            eventBus.addAdapter(sub, subscribes.getString("topic"));
        });
        return bean;
    }
}

EventBus对象

EventBus实体,主要负责三个功能,发布、订阅的入口,以及管理维护MessageAdapter

/**
 * 消息总线
 */
@Component
public class EventBus {

    private final MessagePublisher messagePublisher;

    private Map<TopicMatcher, Map<String, MessageAdapter>> subscribesPool = new ConcurrentHashMap();

    public EventBus(MessagePublisher messagePublisher) {
        this.messagePublisher = messagePublisher;
    }

    /**
     * 集群间消息发布
     *
     * @param message 消息体
     * @return 发布结果
     */
    public Boolean publish(AdapterMessage message) {
        return messagePublisher.publish(message);
    }

    /**
     * 将Adapter注册到EventBus
     *
     * @param adapter
     * @param topic
     */
    public void addAdapter(MessageAdapter adapter, String topic) {
        if (subscribesPool.get(topic) != null) {
            subscribesPool.get(topic).put(adapter.getId(), adapter);
        } else {
            Map<String, MessageAdapter> adapterMap = new HashMap<>();
            adapterMap.put(adapter.getId(), adapter);
            subscribesPool.put(new TopicMatcher(topic), adapterMap);
        }
    }

    /**
     * 将Adapter注册到EventBus
     *
     * @param adapter
     * @param topic
     */
    public void removeAdapter(MessageAdapter adapter, String topic) {
        if (subscribesPool.get(topic) != null) {
            subscribesPool.get(topic).remove(adapter.getId());
        }
    }

    public Map<TopicMatcher, Map<String, MessageAdapter>> getSubscribesPool() {
        return this.subscribesPool;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/582444.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

测试类型(单元、集成、系统或手动测试)

测试类型(单元、集成、系统或手动测试) 单元测试 单元是系统的单个组件&#xff0c;例如类或单个方法。孤立地测试单元称为单元测试。 优点&#xff1a;速度快/易控/易写 缺点&#xff1a;缺乏现实性/无法捕获所有错误&#xff08;例如与其他组件或服务的交互&#xff09; 单元…

40. 组合总和 II

40. 组合总和 II 回溯 去重

springboot+vue+java旅行旅游景点酒店预订出行订票系统eaog5

线上旅行信息管理系统要求实现以下功能&#xff1a; a.景点管理&#xff0c;展示景点的基础信息&#xff0c;介绍等信息。 b.酒店管理,展示酒店的基础信息&#xff0c;介绍等信息。 c.评价管理&#xff0c;可以查看景点或酒店的相关评价信息&#xff0c;客户消费完&#xff0c;…

Vue3+express实现动态编辑element-plus组件tag标签和select下拉框

需求是利用element-plusd的组件标签tag去实现增加部门的种类&#xff0c;效果图如下&#xff1a; ①在系统设置中添加/删减对应的部门 ②在部门下拉框中弹出自己设置的部门 实现的思路是&#xff1a;通过系统设置中的部门设置增删部门&#xff0c;更新数据库中的部门设置字段…

【Vue2.0源码学习】虚拟DOM篇-Vue中的DOM-更新子节点

文章目录 1. 前言2. 更新子节点3. 创建子节点4. 删除子节点5. 更新子节点6. 移动子节点7. 回到源码8. 总结 1. 前言 在上一篇文章中&#xff0c;我们了解了Vue中的patch过程&#xff0c;即DOM-Diff算法。并且知道了在patch过程中基本会干三件事&#xff0c;分别是&#xff1a;…

小型流水线模型的制作

1. 功能说明 本文示例将实现R327a样机——一款5工序的小型流水线模型&#xff0c;包含铸锭送料、传送、搬运、模拟加工、码垛5个工序。 2. 结构说明 小型流水线主要是由铸锭送料结构、传送机构、搬运机构、模拟加工机构、码垛机构5部分组成。 3. 电子硬件 在这个示例中&#xf…

Qt文件系统源码分析—第七篇QFileSelector

深度 本文主要分析Windows平台&#xff0c;Mac、Linux暂不涉及 本文只分析到Win32 API/Windows Com组件/STL库函数层次&#xff0c;再下层代码不做探究 本文QT版本5.15.2 类关系图 QTemporaryFile继承QFile QFile、QSaveFile继承QFileDevice QFileDevice继承QIODevice Q…

【SpringMVC】| 一文带你搞定SpringMVC的@RequestMapping注解

目录 环境搭建 Request注解的功能 1. RequestMapping注解的位置 2、RequestMapping注解的【value】属性 3、RequestMapping注解的【method】属性 4、RequestMapping注解的【params】属性&#xff08;了解&#xff09; 5、RequestMapping注解的【headers】属性&#xff0…

Attention 和 Transformer

本文参考d2l&#xff0c;搭配知识点和代码&#xff0c;助力一口气搞懂Transformer&#xff0c;参考&#xff1a; chapter_attention-mechanisms-and-transformers https://d2l.ai/chapter_attention-mechanisms-and-transformers/index.html 目录如下&#xff1a; x.1 Queri…

项目复盘四步:怎么做才有成效?这些关键点不可忽略

在项目管理中&#xff0c;及时复盘是非常重要的&#xff0c;因为只有通过反思和分析&#xff0c;才能找到差距存在的原因。 复盘分析的第一步是回顾目标 因为目标是工作开展的关键。在执行项目的过程中&#xff0c;要始终朝着所设定的目标去努力实现。计划和现实会存在偏差&…

超详细Zookeeper+Kafka+ELK集群部署

一、Zookeeper 概述 1、Zookeeper 定义 Zookeeper是一个开源的分布式的&#xff0c;为分布式框架提供协调服务的Apache项目。 2、Zookeeper 工作机制 Zookeeper从设计模式角度来理解&#xff1a;是一个基于观察者模式设计的分布式服务管理框架&#xff0c;它负责存储和管理…

AutoCV第九课:ML基础

目录 矩阵运算前言1. 矩阵乘法和求导总结 矩阵运算 前言 手写AI推出的全新保姆级从零手写自动驾驶CV课程&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考。 本次课程主要学习矩阵运算的基础&#xff0c;考虑使用矩阵来表达多个线性回归模型。 课程大纲可看下面…

多元回归预测 | Matlab龙格算法(RUN)优化最小二乘支持向量机回归预测,RUN-LSSVM回归预测,多变量输入模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元回归预测 | Matlab龙格算法(RUN)优化最小二乘支持向量机回归预测,RUN-LSSVM回归预测,多变量输入模型 评价指标包括:MAE、RMSE和R2等,代码质量极高,方便学习和替换数据。要求2018版本及以上。 部分源码 %--…

注意力机制(一)SE模块(Squeeze-and-Excitation Networks)论文总结和代码实现

Squeeze-and-Excitation Networks&#xff08;压缩和激励网络&#xff09; 论文地址&#xff1a;Squeeze-and-Excitation Networks 论文中文版&#xff1a;Squeeze-and-Excitation Networks_中文版 代码地址&#xff1a;GitHub - hujie-frank/SENet: Squeeze-and-Excitation Ne…

git 文件恢复与项目还原:008

1. 【文件恢复】&#xff1a;将文件恢复到上一次提交的状态 注意&#xff1a;新建且没有提交的文件无法使用文件恢复 命令&#xff1a; git checkout -- 文件名假如我们的一开始是这样的&#xff0c;这是没有报错的状态文件 然后我添加了一段内容&#xff0c; 比如我添加这段内…

做外贸算运费的时候需不需要多算一些

看到一个网友在一篇文章下留言说&#xff1a;客户算运费的时候需不需要多算一些 听公司老员工说给客户算运费要多加20% 这样合适吗 我个人感觉有点离谱。 那我们就这个话题&#xff0c;谈一谈运费是否要多加一些呢&#xff1f;为什么要多加一些&#xff1f; 首先&#xff0c;要…

Zookeeper学习---3、服务器动态上下线监听案例、ZooKeeper 分布式锁案例、企业面试真题

1、服务器动态上下线监听案例 1、需求 某分布式系统中&#xff0c;主节点可以有多台&#xff0c;可以动态上下线&#xff0c;任意一台客户端都能实时感知到主节点服务器的上下线。 2、需求分析 3、具体实现 &#xff08;1&#xff09;先在集群上创建/servers 节点 &#xff…

软考A计划-试题模拟含答案解析-卷八

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分享&am…

2023年上半年系统集成项目管理工程师下午真题及答案解析

试题一(18分) A公司跨国收购了B公司的主营业务&#xff0c;保留了B公司原有的人员组织结构和内部办公系统。为了解决B公司内部办公系统与A公司原有系统不兼容的问题&#xff0c;财务、人力和行政部门联合向公司高层申请尽快启动系统和业务的整合。 A公司领导指定HR总监王工担…

云容灾部署前的准备指南

据ITIC的研究表明&#xff0c;98%的千人规模企业每年都会遭遇停机危机&#xff0c;每停机一小时就会损失约700,000人民币。当灾难发生时&#xff0c;使用云容灾的企业可以通过云平台提供的资源和服务&#xff0c;快速帮助企业恢复业务。 HyperBDR云容灾&#xff0c;深度对接全…