Mqtt消费端实现的几种方式

news2024/9/21 20:50:57

此处测试的mqtt的Broker是使用的EMQX 5.7.1,可移步至https://blog.csdn.net/tiantang_1986/article/details/140443513查看详细介绍

一、方式1

添加必要的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

# mqtt 服务端配置
spring:
  # mqtt 配置
  mqtt:
    url: tcp://127.0.0.1:1883,tcp://127.0.0.2:1883
    clientId: "00000001"       # 客户端Id(不可重复)
    username: <访问用户名>      # 认证的用户名
    password: <访问密码>        # 认证的密码
    qos: 1
    topic: test/#              # 监听的topic

读取配置文件

import org.apache.commons.lang3.StringUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

@Data
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfig {
    private String username;
    private String password;
    private String url;
    private String clientId;
    private String topic;
    private Integer qos;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        if (StringUtils.isNotBlank(url) && url.contains(",")) {
            options.setServerURIs(url.split(","));
        } else {
            options.setServerURIs(new String[]{url});
        }        
        options.setCleanSession(true);
        //自动重连
        options.setAutomaticReconnect(true);
        //设置超时时间,单位为秒
        options.setConnectionTimeout(0);
        //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        options.setKeepAliveInterval(90);
        //设置遗嘱消息
        options.setWill("will_topic", (this.clientId + "与服务器断开连接").getBytes(), qos, false);

        factory.setConnectionOptions(options);
        factory.setPersistence(new MemoryPersistence());
        return factory;
    }
}

MQTT消息入站配置

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
    @Resource
    private MqttConfig mqttConfig;
    @Resource
    private MqttPahoClientFactory mqttClientFactory;
    @Resource
    private MqttMessageReceiver mqttMessageReceiver;

    @Bean
    public MessageChannel mqttInBoundChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory, mqttConfig.getTopic());
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        //传输Hex数据,如果是String则可使用默认值false
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setRecoveryInterval(10000);
        adapter.setQos(mqttConfig.getQos());
        adapter.setOutputChannel(mqttInBoundChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInBoundChannel")
    public MessageHandler mqttMessageHandler() {
        return this.mqttMessageReceiver;
    }
}

消费者

@Slf4j
@Component
public class MqttMessageReceiver implements MessageHandler {
    @Resource
    private DataConvertStrategyFactory convertStrategyContext;

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        MessageHeaders headers = message.getHeaders();
        String topic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
        if (StringUtils.isNotBlank(topic)) {
            return;
        }
        byte[] payload = (byte[]) message.getPayload();
        log.info("topic: {}, message: {}", topic, HexUtils.bytesToHex(payload));
        //从topic中获取clientId,topic的格式:{业务}/{clientId}/{事件标识}
        Map<String, String> map = MqttDataConverter.covertTopic(topic);
        String clientId = map.get("clientId");
        log.info("clientId: {}", clientId);
		//topic中的事件标识
        String eventUrl = map.get("event");
		//自定义的enum,主要用来消息处理消息分组,相同组可以使用相同的数据转换服务
        Event[] events = Event.values();
        String deviceId = clientId;
        Arrays.stream(events).filter(item -> item.getEvent().equals(eventUrl)).findFirst().ifPresent(item -> {
        	//使用策略模式实现
            DataConvertService convertService = convertStrategyContext.getStrategy(item.getGroup());
            convertService.convert(deviceId, eventUrl, payload);
        });
    }
}

数据转换服务接口,具体的数据解析只要实现这个接口就行

public interface DataConvertService {
    /**
     * 转换数据
     *
     * @param clientId 设备SN
     * @param topic  topic
     * @param data  数据
     * @return
     */
    Boolean convert(String clientId, String topic, byte[] data);

    /**
     * 获取转换器
     *
     * @return
     */
    String getConverter();
}

MQTT数据转换策略工厂

@Component
public class DataConvertStrategyFactory implements InitializingBean {
    @Resource
    private List<DataConvertService> handlers;
    private Map<String, DataConvertService> dataConvertServiceMap = new ConcurrentHashMap<>();

    /**
     * 初始化
     */
    @Override
    public void afterPropertiesSet() {
        //进行初始化
        if (CollectionUtils.isNotEmpty(handlers)) {
            handlers.forEach(item -> {
                dataConvertServiceMap.put(item.getConverter(), item);
            });
        }
    }

    /**
     * 返回实际处理对象
     *
     * @param strategy 处理策略
     * @return 实际处理对象
     */
    public DataConvertService getStrategy(String strategy) {
        return dataConvertServiceMap.get(strategy);
    }
}

二、方式2

使用EMQXWebhook钩子
首先创建钩子函数,把需要监听的事件加上处理逻辑,示例:

@Slf4j
@RequestMapping("/mqtt/client")
@RestController
public class ClientController {
	@PostMapping("/webhook")
	public Result webhook(@RequestBody Map<String, Object> message) {
        log.info("webhook map:{}", message);
        String action = (String) message.get("action");
        String clientid = (String) message.get("clientid");
        if ("client_connected".equals(action)) {
            log.info("client:{} 上线", clientid);
        }
        if ("client_disconnected".equals(action)) {
            log.info("client:{} 下线", clientid);
        }
        if ("message.publish".equals(action)) {
            log.info("已接收到 client:{} 的消息:{}", clientid, message.get("payload"));
        }
        return Result.success("OK");
    }
}

然后在EMQX的Dashboard中创建Webhook,可以选择多个触发器
在这里插入图片描述
填好URL后可以进行测试,之后使用MQTTX进行消息发送测试
在这里插入图片描述
控制台输出日志
在这里插入图片描述

三、方式3

package com.iinplus.mqtt.handler;

import com.iinplus.mqtt.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class MqttSubscriber implements InitializingBean {
    @Resource
    private MqttConfig config;

    @Override
    public void afterPropertiesSet() {
        try {
            MqttClient client = new MqttClient(config.getUrl(), config.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(config.getUsername());
            options.setPassword(config.getPassword().toCharArray());
            options.setCleanSession(true);
            options.setAutomaticReconnect(true);
            options.setConnectionTimeout(0);
            client.connect(options);
            client.subscribe(config.getTopic());
			//设置消息回调
            client.setCallback(new MqttMsgHandler());
        } catch (MqttException e) {
            log.error("MqttException:", e);
        }
    }
}

消息回调处理

@Slf4j
public class MqttMsgHandler implements MqttCallback {
    @Override
    public void connectionLost(Throwable t) {
        // 连接丢失
        log.info("Connection lost:", t);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // 接收到消息
        log.info("Message arrived:" + new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // 消息发送成功
        log.info("Delivery complete");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2103043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

蒸馏之道:如何提取白酒中的精华?

在白酒的酿造过程中&#xff0c;蒸馏是一道至关重要的工序&#xff0c;它如同一位技艺精细的炼金术士&#xff0c;将原料中的精华提炼出来&#xff0c;凝聚成滴滴琼浆。今天&#xff0c;我们就来探寻这蒸馏之道&#xff0c;看看豪迈白酒&#xff08;HOMANLISM&#xff09;是如何…

Linux 学习之路 - 信号的保存

前面已经介绍过信号的产生&#xff0c;本文将继续介绍信号的保存与处理。 1、上篇文章的遗留问题 从上篇文章(Linux学习之路 -- 信号概念 && 信号的产生-CSDN博客)中&#xff0c;其实还遗留了一些问题。OS在接受到信号后&#xff0c;大部分的进程的处理方式都是终止进…

合宙低功耗4G模组Air780E——产品规格书

Air780E 是合宙通信推出的 LTE Cat.1 bis通信模块&#xff1b; 采用移芯EC618平台&#xff0c;支持 LTE 3GPP Rel.13 技术。 Air780E特点和优势总结如下&#xff1a; 全网通兼容性&#xff1a; 作为4G全网通模块&#xff0c;兼容不同运营商网络&#xff0c;包括但不限于移动、…

【C++ Primer Plus习题】10.1

问题: 解答: main.cpp #include <iostream> #include "BankAccount.h" using namespace std;int main() {BankAccount BA1("韩立","韩跑跑",1);BA1.get_info();BankAccount BA;BA.init_account("姚国林", "amdin", 1…

国际化产品经理的挑战与机遇:跨文化产品管理的探索

全球化背景下的产品管理变革 在当今全球化的背景下&#xff0c;科技的进步和通信技术的普及&#xff0c;使得世界变得更加紧密相连。产品不再仅仅局限于单一市场&#xff0c;而是面向全球用户&#xff0c;这对产品经理提出了新的挑战与机遇。跨文化的产品管理要求产品经理不仅…

09-03 周二 ansible部署和节点管理过程

09-03 周二 ansible部署和节点管理过程 时间版本修改人描述2024年9月3日10:08:58V0.1宋全恒新建文档&#xff0c; 简介 首先要找一个跳板机&#xff0c;来确保所有的机器都可以访问。然后我们围绕ansible来搭建环境&#xff0c;方便一键执行所有的命令&#xff0c;主要的任务是…

通信算法之232: 无线发射功率和信号强度,常用单位dB、dBm、dBi和dBd介绍

[转载] 无线功率和信号强度的基本概念 在无线网络中&#xff0c;使用AP设备和天线来实现有线和无线信号互相转换。如下图所示&#xff1a; 有线网络侧的数据从AP设备的有线接口进入AP后&#xff0c;经AP处理为射频信号&#xff0c;从AP的发送端&#xff08;TX&#xff09;经过…

JAVA-JVM 内存模型类加载器GC算法GC调优

JAVA-JVM 内存模型&类加载器&GC算法&GC调优 什么是JVM JVM 内存模型 JVM的GC算法 JVM类加载器 什么是JVM ? [[jvm]]是Java Virtual Machine&#xff08;Java虚拟机&#xff09;的缩写&#xff0c;JVM是一个虚构出来的计算机&#xff0c;有着自己完善的硬件架构&a…

Qwen-7B-Chat大模型安装训练推理-helloworld

初始大模型之helloworld编写 开发环境&#xff1a;modelscope GPU版本上测试的&#xff0c;GPU免费36小时 ps:可以不用conda直接用环境自带的python环境使用 魔搭社区 安装conda wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh 1.2 bash Mini…

港科夜闻 | 香港科大举办开学嘉年华,叶玉如校长勉励新生发掘潜能传承凡事皆可为精神...

关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科大举办开学嘉年华&#xff0c;叶玉如校长勉励新生发掘潜能传承「凡事皆可为」精神。迎接新学年&#xff0c;香港科大于9月2日起举行为期两天的开学嘉年华「Fire Up Your Year」&#xff0c;校长叶玉如教授联同一众…

AI写作保姆级方法论第六节-AI的终极调教心法(问题+解决方案)

效果是什么 大象基于大量的实战经验&#xff0c;总结出了AI prompt调教的终极杀手锏&#xff1a;【终极调教心法&#xff1a;1个原则和3个技巧】 一个原则&#xff0c;是指AI的【角色扮演法】&#xff0c;openai官方基于AI原理给出的让AI听话的技巧。所有AI的使用玩法&#xff…

Leetcode3250. 单调数组对的数目 I

Every day a Leetcode 题目来源&#xff1a;3250. 单调数组对的数目 I 解法1&#xff1a;记忆化搜索 题目输入一个数组nums。 假设有两个数组A和B&#xff0c;A递增&#xff0c;B递减&#xff0c;且 Ai Bi numsi ​ 问有多少对(A,B)数组对。 解法&#xff1a; 代码&…

java基础知识-JVM知识详解

一、JVM内存结构 Java虚拟机(JVM)的内存结构主要分为几个不同的区域,每个区域都有其特定的目的和功能。以下是JVM内存结构的主要组成部分: 先看一下总体的结构图 程序计数器(Program Counter Register) 这是一个较小的内存块,用于存储当前线程所执行的字节码指令的地址…

第T4周:猴痘病识别

本文为&#x1f517;365天深度学习训练营 中的学习记录博客原作者&#xff1a;K同学啊 我的环境&#xff1a; ● 语言环境&#xff1a;Python3.6.5 ● 编译器&#xff1a;jupyter notebook ● 深度学习框架&#xff1a;TensorFlow 2.6.2 ● 数据&#xff1a;猴痘病数据集 一、…

非 congda 环境 ubuntu 22.04 源码编译安装 pytorch 并初步检查可用性

非 congda 环境 编译安装 pytorch 0, 安装 cuda sdk &#xff0c;cudnn 及 nccl 按照官网步骤&#xff0c;blacklist需要特别注意 0.1 cuda sdk 0.2 cudnn 0.3 安装nccl git clone --recursive https://github.com/NVIDIA/nccl.git ls cd nccl/ make -j src.build sudo apt…

使用 docker 部署 kvm 图形化管理工具 WebVirtMgr

文章目录 [toc]前提条件镜像构建启动 webvirtmgr创建其他 superuser配置 nginx 反向代理和域名访问绑定 kvm 宿主机local sockettcp 连接 虚拟机创建创建快照虚拟机克隆删除虚拟机 kvm 官方提供了以下这些图形化管理&#xff0c;license 这块也提示了是商业版&#xff08;Comme…

rometheus Blackbox监控网站

Blackbox Exporter简介 blackbox_exporter 是 Prometheus 拿来对 http/https、tcp、icmp、dns、进行的黑盒监控工具&#xff0c;也就是从服务、主机等外部进行探测&#xff0c;来查看服务、主机等是否可用。 Blackbox Exporter 默认端口是 9115&#xff0c; 安装1 wget htt…

Codeforces Round (Div.3) C.Sort (前缀和的应用)

原题&#xff1a; time limit per test&#xff1a;5 seconds memory limit per test&#xff1a;256 megabytes You are given two strings a and b of length n. Then, you are (forced against your will) to answer q queries. For each query, you are given a range …

Dify 与 FastGPT 流程编排能力对比分析

Dify 与 FastGPT 流程编排能力对比分析 一、引言 在人工智能快速发展的今天&#xff0c;大语言模型&#xff08;LLM&#xff09;应用平台正在重塑各行各业的工作流程。其中&#xff0c;Dify 和 FastGPT 作为两款具有重要影响力的工具&#xff0c;凭借各自独特的流程编排能力&a…

智能化升级:AI在客服知识库中的应用

引言 在数字化时代&#xff0c;客户服务已成为企业竞争的关键一环。随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;传统客服模式正经历着前所未有的变革。AI与客服知识库的深度融合&#xff0c;不仅极大地提升了客服处理的效率与准确性&#xff0c;还为用…