一分钟在SpringBoot项目中使用EMQ

news2025/1/11 21:58:27
先展示最终的结果:

生产者端:

@RestController
@RequiredArgsConstructor
public class TestController {

    private final MqttProducer mqttProducer;

    @GetMapping("/test")
    public String test() {
        User build = User.builder().age(100).sex(1).address("世界潍坊渤海之眼").build();
        // 延时发布
        mqttProducer.send("$delayed/10/cookie", 2, JSON.toJSONString(build));
        return "ok";
    }

}

消费者端

/**
 * @author : Cookie
 * date : 2024/1/30
 */
@Component
@Topic("cookie")
public class TestTopicHandler implements MsgHandler {

    @Override
    public void process(String jsonMsg) {
        User user = JSON.parseObject(jsonMsg, User.class);
        System.out.println(user);
    }

}

控制台结果:

在这里插入图片描述

具体解释在之前的笔记中, 需要的话可以查看 EMQ的介绍及整合SpringBoot的使用-CSDN博客


OK, 下面我们就开始实现上面的逻辑, 你要做的就是把 1-9 复制到项目即可

1. 依赖导入
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>
2. yml 配置
# 顶格
mqtt:
  client:
    username: admin
    password: public
    serverURI: tcp://192.168.200.128:1883
    clientId: monitor.task.${random.int[10000,99999]} # 注意: emq的客户端id 不能重复
    keepAliveInterval: 10  #连接保持检查周期  秒
    connectionTimeout: 30 #连接超时时间  秒
  producer:
    defaultQos: 2
    defaultRetained: false
    defaultTopic: topic/test1
  consumer:
    consumerTopics: $queue/cookie/#, $share/group1/yfs1024  #不带群组的共享订阅    多个主题逗号隔开
    # $queue/cookie/#
    # 以$queue开头,不带群组的共享订阅   多个客户端只能有一个消费者消费

    # $share/group1/yfs1024
    # 以$share开头,群组的共享订阅 多个客户端订阅
    # 如果在一个组 只能有一个消费者消费
    # 如果不在一个组 都可以消费
3. 属性配置
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt.client")
public class MqttConfigProperties {

    private int defaultProducerQos;
    private boolean defaultRetained;
    private String defaultTopic;
    private String username;
    private String password;
    private String serverURI;
    private String clientId;
    private int keepAliveInterval;
    private int connectionTimeout;

}
4. 定义主题注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Topic {
    String value();
}
5.Mqtt配置类
@Data
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConfig {

    private final MqttConfigProperties configProperties;
    private final MqttCallback mqttCallback;

    @Bean
    public MqttClient mqttClient() {
        try {
            MqttClient client = new MqttClient(configProperties.getServerURI(), configProperties.getClientId(), mqttClientPersistence());
            client.setManualAcks(true); //设置手动消息接收确认
            mqttCallback.setMqttClient(client);
            client.setCallback(mqttCallback);
            client.connect(mqttConnectOptions());
            return client;
        } catch (MqttException e) {
            log.error("emq connect error", e);
            return null;
        }
    }

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(configProperties.getUsername());
        options.setPassword(configProperties.getPassword().toCharArray());
        options.setAutomaticReconnect(true);//是否自动重新连接
        options.setCleanSession(true);//是否清除之前的连接信息
        options.setConnectionTimeout(configProperties.getConnectionTimeout());//连接超时时间
        options.setKeepAliveInterval(configProperties.getKeepAliveInterval());//心跳
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);//设置mqtt版本

        return options;
    }

    public MqttClientPersistence mqttClientPersistence() {
        return new MemoryPersistence();
    }

}

6. 定义消息处理接口
/**
 * 消息处理接口
 */
public interface MsgHandler {

    void process(String jsonMsg) throws IOException;

}
7.定义消息上下文
/**
 * 消息处理上下文, 通过主题拿到topic
 */
public interface MsgHandlerContext{

    MsgHandler getMsgHandler(String topic);

}
8. 定义回调类
@Component
@Slf4j
public class MqttCallback implements MqttCallbackExtended {
    // 需要订阅的topic配置
    @Value("${mqtt.consumer.consumerTopics}")
    private List<String> consumerTopics;

    @Autowired
    private MsgHandlerContext msgHandlerContext;

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("emq error.", throwable);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("topic:" + topic + "  message:" + new String(message.getPayload()));
        //处理消息
        String msgContent = new String(message.getPayload());
        log.info("接收到消息:" + msgContent);
        try {
            // 根据主题名称 获取 该主题对应的处理器对象
            // 多态 父类的引用指向子类的对象
            MsgHandler msgHandler = msgHandlerContext.getMsgHandler(topic);
            if (msgHandler == null) {
                return;
            }
            msgHandler.process(msgContent); //执行
        } catch (IOException e) {
            log.error("process msg error,msg is: " + msgContent, e);
        }
        //处理成功后确认消息
        mqttClient.messageArrivedComplete(message.getId(), message.getQos());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    @Override
    public void connectComplete(boolean b, String s) {
        log.info("连接成功");
        //和EMQ连接成功后根据配置自动订阅topic
        if (consumerTopics != null && consumerTopics.size() > 0) {
            // 循环遍历当前项目中配置的所有的主题.
            consumerTopics.forEach(t -> {
                try {
                    log.info(">>>>>>>>>>>>>>subscribe topic:" + t);
                    // 订阅当前集群中所有的主题 消息服务质量 2 -> 至少收到一个
                    mqttClient.subscribe(t, 2);
                } catch (MqttException e) {
                    log.error("emq connect error", e);
                }
            });
        }
    }

    private MqttClient mqttClient;
	// 在配置类中调用传入连接
    public void setMqttClient(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }
}
8. 消息处理类加载器

作用: 将Topic跟处理类对应 通过 handlerMap

/**
 * 消息处理类加载器
 * 作用:
 * 1. 因为实现了Spring 的 ApplicationContextAware 接口, 项目启动后就会运行实现的方法
 * 2. 获取MsgHandler接口的所有的实现类
 * 3. 将实现类上的Topic注解的值,作为handlerMap的键,实现类(处理器)作为对应的值
 */
@Component
public class MsgHandlerContextImp implements ApplicationContextAware, MsgHandlerContext {
    
    private Map<String, MsgHandler> handlerMap = Maps.newHashMap();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 从spring容器中获取 <所有> 实现了MsgHandler接口的对象
        // key 默认类名首字母小写 value 当前对象
        Map<String, MsgHandler> map = applicationContext.getBeansOfType(MsgHandler.class);
        map.values().forEach(obj -> {
            // 通过反射拿到注解中的值  即 当前类处理的 topic
            String topic =  obj.getClass().getAnnotation(Topic.class).value();
		   // 将主题和当前主题的处理类建立映射
            handlerMap.put(topic,obj);
        });
    }

    @Override
    public MsgHandler getMsgHandler(String topic) {
        return handlerMap.get(topic);
    }
}
9. 封装消息生产者
@Slf4j
@Component
public class MqttProducer {
	// @Value() 读取配置 当然也可以批量读取配置,这里就一个一个了
    @Value("${mqtt.producer.defaultQos}")
    private int defaultProducerQos;
    @Value("${mqtt.producer.defaultRetained}")
    private boolean defaultRetained;
    @Value("${mqtt.producer.defaultTopic}")
    private String defaultTopic;

    @Autowired
    private MqttClient mqttClient;

    public void send(String payload) {
        this.send(defaultTopic, payload);
    }

    public void send(String topic, String payload) {
        this.send(topic, defaultProducerQos, payload);
    }

    public void send(String topic, int qos, String payload) {
        this.send(topic, qos, defaultRetained, payload);
    }

    public void send(String topic, int qos, boolean retained, String payload) {
        try {
            mqttClient.publish(topic, payload.getBytes(), qos, retained);
        } catch (MqttException e) {
            log.error("publish msg error.",e);
        }
    }

    public <T> void send(String topic, int qos, T msg) throws JsonProcessingException {
        String payload = JsonUtil.serialize(msg);
        this.send(topic,qos,payload);
    }
}

最终的实现的结果

  • 生产者端: 在需要发送消息的地方注入 MqttProducer 发送消息
  • 消费者端: 在需要处理对应主题的类上 实现 MsgHandler接口
代码示例
生产者端
@RestController
@RequiredArgsConstructor
public class TestController {

    private final MqttProducer mqttProducer;

    @GetMapping("/test")
    public String test() {
        User build = User.builder().age(100).sex(1).address("世界潍坊渤海之眼").build();
        // 延时发布
        mqttProducer.send("$delayed/10/cookie", 2, JSON.toJSONString(build));
        return "ok";
    }
}
消费者端
@Component
@Topic("cookie")
public class TestTopicHandler implements MsgHandler {

    @Override
    public void process(String jsonMsg) {
        User user = JSON.parseObject(jsonMsg, User.class);
        System.out.println(user);
    }
    
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1421358.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Wireshark网络协议分析 - UDP协议

在我的博客阅读本文 文章目录 1. 基础2. 实战2.1. 用Go写一个简单的UDP服务器与客户端2.2. Wireshark抓包分析 3. UDP与TCP的区别4. 参考资料 1. 基础 UDP包的数据结构&#xff1a; 2. 实战 2.1. 用Go写一个简单的UDP服务器与客户端 我们这里使用Golang写了一个简单的9830端…

docker由浅入深

一、什么是docker docker 顾名思义就是轮船的意思&#xff0c;轮船我们知道是通过集装箱运载货物的东西&#xff0c;那么docker其实也是类似的东西&#xff0c;只是装载的是虚拟的运行程序罢了。其中集装箱在docker里面被称为container&#xff08;后面以容器称之&#xff09;…

林浩然的Python奇遇记:从小白到科学界的编程魔法师

林浩然的Python奇遇记&#xff1a;从小白到科学界的编程魔法师 Lin Haoran’s Python Adventure: From Novice to Programming Sorcerer in the Scientific Realm 在那个平凡的午后&#xff0c;我们的主角林浩然——一个对编程一窍不通但对世界充满好奇的物理学大二学生&#x…

qiankun子应用静态资源404问题有效解决(遇到了http请求静态文件404、css文件引用图片无法显示、svg图片转换成 base64无法显示等问题)

在&#x1f449;&#x1f3fb; qiankun微前端部署&#x1f448;&#x1f3fb;这个部署方式的前提下&#xff0c;遇到的问题并解决问题的过程 >> 问题现象 通过http请求本地的静态json文件404 css中部分引入的图片无法显示 >> 最开始的解决方式 在&#x1f449;…

Linux第36步_创建正点原子的TF-A工作区

创建正点原子的TF-A工作区&#xff0c;目的是想查看正点原子的设备树文件“stm32mp157d-atk.dts”和设备树头文件“stm32mp157d-atk.dtsi”&#xff0c;了解设备树是什么样子&#xff0c;为后期基于“ST公司的源码”创建自己的设备树提供参考&#xff0c;同时也是为了学习移植u…

飞桨大模型分布式训练技术

今天我为大家介绍飞桨大模型分布式训练技术&#xff0c;内容分为以下几个部分&#xff1a; 首先&#xff0c;我会介绍大模型训练面临的重点难题&#xff1b;然后&#xff0c;为大家介绍飞桨在大模型训练领域的特色分布式训练技术和优化方案&#xff1b;最后&#xff0c;伴随着…

NeRF:神经辐射场复杂场景的新视图合成技术

NeRF&#xff1a;神经辐射场复杂场景的新视图合成技术 NeRF&#xff1a;神经辐射场复杂场景的新视图合成技术项目背景与意义如何运行&#xff1f;快速开始更多数据集 预训练模型方法与实现结语服务 NeRF&#xff1a;神经辐射场复杂场景的新视图合成技术 在计算机视觉领域&…

工业物联网网关如何实现工业设备的远程运维?-天拓四方

随着工业4.0和智能制造的快速发展&#xff0c;工业设备的远程运维已经成为提高企业生产效率和降低运营成本的重要手段。工业物联网网关作为这一过程中的关键组件&#xff0c;发挥着不可或缺的作用。本文将重点探讨工业物联网网关如何实现工业设备的远程运维&#xff0c;并结合实…

网络和Linux网络_15(IO多路转接)reactor编程_服务器+相关笔试题

目录 1. reactor的服务器 1.1 Sock.hpp 1.2 加协议分割报文 1.3 序列化和反序列化 Protocol.hpp main.cc Epoll.hpp TcpServer.hpp 2. 相关笔试题 答案及解析 本篇完。 1. reactor的服务器 Log.hpp和以前一样&#xff0c;因为下面要写ET模式所以Sock.hpp加了一个把…

[Python] 什么是集成算法,什么是随机森林?随机森林分类器(RandomForestClassifier)及其使用案例

什么是集成算法&#xff1f; 集成算法是一种机器学习方法&#xff0c;它将多个基本的学习算法&#xff08;也称为弱学习器&#xff09;组合在一起&#xff0c;形成一个更强大的预测模型。集成算法通过对基本模型的预测进行加权平均或多数投票等方式&#xff0c;来产生最终的预…

【Leetcode】两数之和

目录 题目&#xff1a; 解法1&#xff1a;暴力双for 1.想到的第一种方法两for循环解 复杂度分析 解法2&#xff1a;hash表 总结&#xff1a; 笔记&#xff1a; 题目&#xff1a; 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标…

简单区间DP

文章目录 什么是区间DpAcWing 282. 石子合并题意分析思路解析状态表示状态计算 CODE需要注意的问题 什么是区间Dp 区间Dp指的是某些问题可以用区间来划分解决。 AcWing 282. 石子合并 题目链接&#xff1a;穿梭时间的画面的钟 题意分析 从一排石子中选择相邻的两堆进行合并…

2024-01-25 力扣高频SQL50题目1193每月交易

1.1193每月交易 1 count可以这样用。。 COUNT(IF(state approved, 1, NULL)) AS approved_count 如果 COUNT(if(state approved,1,0))&#xff0c;这里变成0&#xff0c;就不对了。因为count计数时候&#xff0c;只要里面不是null&#xff0c;就会算进去。 sum(if(state …

(学习日记)2024.01.27

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…

行为型设计模式—迭代器模式

迭代器模式&#xff1a;也叫作游标模式&#xff0c;能在不暴露复杂数据结构内部细节的情况下遍历其中所有的元素。在迭代器的帮助下&#xff0c; 客户端可以用一个迭代器接口以相似的方式遍历不同集合中的元素。 当集合背后为复杂的数据结构&#xff0c;且希望对客户端隐藏其复…

漏洞原理文件上传漏洞

一 文件上传漏洞介绍&#xff08;理论&#xff09; 文件上传漏洞是一种常见的web应用程序漏洞&#xff0c;允许攻击者向服务器上传恶意文件。这种漏洞可在没有恰当的安全措施的情况下&#xff0c;将任意类型的文件上传到服务器上&#xff0c;从而可能导致以下安全问题&#xff…

【lesson1】高并发内存池项目介绍

文章目录 这个项目做的是什么&#xff1f;这个项目的要求的知识储备和难度&#xff1f;什么是内存池池化技术内存池内存池主要解决的问题malloc 这个项目做的是什么&#xff1f; 当前项目是实现一个高并发的内存池&#xff0c;他的原型是google的一个开源项目tcmalloc&#xf…

Python 字典及常见应用(Python Dctionary)

字典是python的内置基本数据类型之一&#xff0c;其他语言中可能会被称为“关联存储”或“关联数组”。它是一种映射关系&#xff0c;以包含在{}中的"键:值"对表示。字典是一种可变对象&#xff0c;键没有顺序。其主要用途是通过关键字存储、提取值。 目录 一、字典的…

3分钟搞定幻兽帕鲁联机,一键部署专属服务器

3分钟搞定幻兽帕鲁联机&#xff0c;一键部署专属服务器 访问帕鲁专题活动页 登录阿里云官网&#xff0c;用钉钉或者支付宝app扫码注册新用户&#xff08;新用户福利较多&#xff0c;优惠力度大&#xff09; 进入阿里云游戏联机服务器专题页&#xff0c;点击 一键购买及部署 即可…

记录一条sql查询:以逗号隔开的id字符串的查询

目录 前言表结构sql语句 前言 在一个项目中有两张表&#xff0c;一张是商品码表&#xff0c;一张是记录出库单明细的出库记录表&#xff0c;记录表中有一个字段保存了以逗号隔开的商品码表的id字符串&#xff0c;需要根据出库明细id查找到对应出库的商品码。 表结构 goods_det…