消息通讯-MQTT WebHookSpringBoot案例

news2025/1/11 9:56:05

一、EMQX WebHook介绍

1、EMQX WebHook 是由 emqx_web_hook (opens new window)插件提供的将EMQX中的钩子事件通知到某个Web服务的功能。
2、WebHook 的内部实现是基于钩子,借助 Webhook 可以完成设备在线、上下线记录,订阅与消息存储、消息送达确认等诸多业务。

3、它通过在钩子上的挂载回调函数,获取到 EMQX 中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。

以 客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:

    Client      |    EMQX     |  emqx_web_hook |   HTTP       +------------+
  =============>| - - - - - - -> - - - - - - - ->===========>  | Web Server |
                |    Broker    |                |  Request     +------------+

注意:WebHook 对于事件的处理是单向的,它仅支持将 EMQX 中的事件推送给 Web 服务,并不关心 Web 服务的返回。

二、EMQX WebHook配置项

Webhook 的配置文件位于 etc/plugins/emqx_web_hook.conf,配置项的详细说明可以查看官方文档配置

在默认etc/plugins/emqx_web_hook.conf中,官方为我们提供了如下的配置触发规则:

##====================================================================
## WebHook
##====================================================================

web.hook.api.url = http://127.0.0.1:80

## Format:
##   web.hook.rule.<HookName>.<No> = <Spec>
#web.hook.rule.client.connect.1       = {"action": "on_client_connect"}
#web.hook.rule.client.connack.1       = {"action": "on_client_connack"}
#web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
#web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
#web.hook.rule.client.subscribe.1     = {"action": "on_client_subscribe"}
#web.hook.rule.client.unsubscribe.1   = {"action": "on_client_unsubscribe"}
#web.hook.rule.session.subscribed.1   = {"action": "on_session_subscribed"}
#web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
#web.hook.rule.session.terminated.1   = {"action": "on_session_terminated"}
#web.hook.rule.message.publish.1      = {"action": "on_message_publish"}
#web.hook.rule.message.delivered.1    = {"action": "on_message_delivered"}
#web.hook.rule.message.acked.1        = {"action": "on_message_acked"}

其中webhook支持的事件如下:

三、如何使用EMQX WebHook配置

手动修改Webhook的配置文件位于:etc/plugins/emqx_web_hook.conf.
修改配置非常简单,打开指定文件后,官方已经写好这些了,如上面所示,
只需要修改web.hook.url,和在你需要的钩子事件前的#号去掉即可。

emqx 启用插件 webhook插件(emqx有web控制台,在插件里启用就可以)

刷新后emqx_web_hook插件运行中即启动成功

然后就是自行搭建一个web接口服务,当客户端有指定事件后会主动推送,
接口地址即为Webhook的配置文件中配置的web.hook.api.url

WebHook配置事件推送参数详解

1、client.connect(处理连接报文)

 

2、client.connack(下发连接应答)

3、client.connected(成功接入)

4、client.disconnected(连接断开)

5、client.subscribe(订阅主题)

6、client.unsubscribe(取消订阅)

session.subscribed(会话订阅主题)
同 client.subscribe,action 为 session_subscribed

session.unsubscribed(会话取消订阅)
同 client.unsubscribe,action 为 session_unsubscribe

1、message.publish(消息发布)
2、message.delivered(消息投递)

3、message.acked(消息回执)

四、SpringBoot集成Webhook实现客户端断连监控

通过使用Emqx的Webhook插件实现监控所有客户端的连接状态,并做出客户端上线和下线后的逻辑处理

1. 实现前提

修改 etc/plugins/emqx_web_hook.conf 文件,设置事件转发的url和地址和触发规则
 

# 事件需要转发的目的服务器地址
web.hook.api.url = http://127.0.0.1:8000/api/mqtt/webhook

# 打开下面两个触发规则
web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
2. 代码实现接口

通过EMQX 的webhook将客户端的连接断开等事件通知到我们自建的服务上,通过事件类型获取客户端的连接状态,然后将客户端的连接状态进行存储,并且提供HTTP API供后台系统查询所有客户端的状态。

package com.team.modules.mqtt.controller;

import com.google.gson.Gson;
import com.team.modules.mqtt.client.MyMQTTClient;
import com.team.modules.mqtt.service.SenderService;
import com.team.modules.system.domain.vo.ResultVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

@RestController
@Slf4j
@RequestMapping("/api/mqtt")
public class SenderController {

    @Autowired
    private MyMQTTClient myMQTTClient;

    @PostMapping("/webhook")
    public void webhook(@RequestBody() Map<String, Object> message) {
        // 监听连接的上线和下线
        System.out.println("收到的消息:" + message);

        String action = (String) message.get("action");
        String clientid = (String) message.get("clientid");
        if (action.equals("client_connected")) {
            log.info("client:{} 上线", clientid);
        }
        if (action.equals("client_disconnected")) {
            log.info("client:{} 下线", clientid);
        }
    }
}
3. 监听结果

启动服务后,模拟客户端连接emqx,查看服务记录的日志:

五、总结

由此看出客户端建立连接和断开连接后,Emqx服务都会给配置的接口web.hook.api.url = http://127.0.0.1:8000/api/mqtt/webhook发送一条数据,该数据字段可以参考上述的:WebHook配置事件推送参数详解。使用此方法可以将mqtt应用的更加灵活。其他的监听事件配置也类似,遇到合适场景可以自行发挥。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1226532.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Spring Boot 源码学习】Banner 信息打印流程

Spring Boot 源码学习系列 Banner 信息打印流程 引言往期内容主要内容1. printBanner 方法2. 关闭 Banner 信息打印3. SpringApplicationBannerPrinter 类3.1 LOG 模式打印3.1.1 getBanner 方法3.1.1.1 新建 Banners3.1.1.2 添加 ImageBanner3.1.1.3 添加 ResourceBanner3.1.1.…

Flume的安装部署及常见问题解决

1.安装地址 &#xff08;1&#xff09; Flume官网地址&#xff1a;http://flume.apache.org/ &#xff08;2&#xff09;文档查看地址&#xff1a;http://flume.apache.org/FlumeUserGuide.html &#xff08;3&#xff09;下载地址&#xff1a;http://archive.apache.org/dist…

RVC从入门到......

RVC变声器官方教程&#xff1a;10分钟克隆你的声音&#xff01;一键训练&#xff0c;低配显卡用户福音&#xff01;_哔哩哔哩_bilibili配音&#xff1a;AI逍遥散人&#xff08;已授权&#xff09;关注UP主并私信"RVC"&#xff08;三个字母&#xff09;自动获取一键训…

PS 颜色取样器标尺工具 基本使用讲解

上文 PS 吸管工具基本使用方法 我们讲完了 吸管工具 那么 我们继续 打开ps先 接着 我们选择这个 颜色取样器工具 选择之后 我们鼠标在图像上随便点一下 就会出现一个标记 然后 我们可以点多几个地方 边上的信息面板就会输出 点1 和 点2 甚至 多个 点3 点4 的 颜色 RGB代码 …

Python 如何实现备忘录设计模式?什么是备忘录设计模式?Python 备忘录设计模式示例代码

什么是备忘录&#xff08;Memento&#xff09;设计模式&#xff1f; 备忘录&#xff08;Memento&#xff09;设计模式是一种行为型设计模式&#xff0c;用于捕获一个对象的内部状态&#xff0c;并在对象之外保存这个状态&#xff0c;以便在需要时恢复对象到先前的状态。这种模…

[qemu逃逸] DefconQuals2018-EC3

前言 一道简单的套壳堆题.原本题目环境为 ubu16, 我这里使用的是 ubu18 设备逆向 qemu-system-x86_64 只开了 Canary 和 NX 保护. 比较简单, 主要逻辑在 mmio_write 里面, 其实现了一个菜单堆, 具有增删改的功能: 但是在释放堆块时并没有置空, 所以这里存在 UAF. 而程序还直…

三、程序员指南:数据平面开发套件

定时器库 定时器库为DPDK执行单元提供了定时器服务&#xff0c;以便异步执行回调函数。该库的特点包括&#xff1a; 定时器可以是周期性的&#xff08;多次触发&#xff09;或单次的&#xff08;一次性触发&#xff09;。定时器可以从一个核加载并在另一个核上执行。这必须在…

IntelliJ IDEA 2023 v2023.2.5

IntelliJ IDEA 2023是一款功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;为开发人员提供了许多特色功能&#xff0c;以下是其特色介绍&#xff1a; 新增语言支持&#xff1a;IntelliJ IDEA 2023新增对多种编程语言的支持&#xff0c;包括Kotlin、TypeScript、…

介绍交换空间概念以及如何设置交换空间

文章目录 什么交换空间新增交换空间 什么交换空间 交换空间&#xff08;Swap space&#xff09;是计算机内存的一种补充&#xff0c;位于硬盘驱动器上。当物理内存不足时&#xff0c;系统会将不活跃的页面移到交换空间中。 交换空间可以帮助系统在以下情况下运行&#xff1a…

mysql 实现去重

个人网站 首发于公众号小肖学数据分析 1、试题描述 数据表user_test如下&#xff0c;请你查询所有投递用户user_id并且进行去重展示&#xff0c;查询结果和返回顺序如下 查询结果和返回顺序如下所示 解题思路&#xff1a; (1) 对user_id列直接去重&#xff1a; &#xff…

Kotlin学习(一)

Kotlin学习&#xff08;一&#xff09; 1.使用IDEA构建Kotlin项目 新建工程即可 我这里选择的Build System是IntelliJ&#xff0c;虽然我没用过但是这是Kotlin基础学习应该不会用到其他依赖 2.Hello World package com.simonfun main(args:Array<String>){println(&q…

Go 语言中切片的使用和理解

切片与数组类似&#xff0c;但更强大和灵活。与数组一样&#xff0c;切片也用于在单个变量中存储相同类型的多个值。然而&#xff0c;与数组不同的是&#xff0c;切片的长度可以根据需要增长和缩小。在 Go 中&#xff0c;有几种创建切片的方法&#xff1a; 使用[]datatype{val…

使用 C 语言快速排序将字符串按照 ASCII 码升序排列

示例代码&#xff1a; #include <stdio.h> #include <string.h> #include <stdlib.h>static Comp(const void *a, const void *b) {char *pa (char *)a;char *pb (char *)b;return strcmp(a, b); }int main(void) {char strs[3][10] { "bd", &q…

C++ Qt 学习(十):Qt 其他技巧

1. 带参数启动外部进程 QProcess 用于启动外部进程int QProcess::execute(const QString &program, const QStringList &arguments);QObject *parent; ... QString program "./path/to/Qt/examples/widgets/analogclock"; QStringList arguments; argument…

卷积、卷积图像操作和卷积神经网络

好多内容直接看书确实很难坚持&#xff0c;就比如这个卷积&#xff0c;书上的一大堆公式和图表直接把人劝退&#xff0c;我觉得一般的学习流程应该是自顶向下&#xff0c;先整体后局部&#xff0c;先把握大概再推敲细节的&#xff0c;上来就事无巨细地展示对初学者来说很痛苦。…

泉盛UV-K5/K6全功能中文固件

https://github.com/wu58430/uv-k5-firmware-chinese/releases 主要功能&#xff1a; 中文菜单 许多来自 OneOfEleven 的模块&#xff1a; AM 修复&#xff0c;显著提高接收质量长按按钮执行 F 操作的功能复制快速扫描菜单中的频道名称编辑频道名称 频率显示选项扫描列表分配…

文本转语音

免费工具 音视频转译 通义听悟 | https://tingwu.aliyun.com/u/wg57n33kml5nkr3p 音色迁移 speechify | https://speechify.com/voice-cloning/ 视频生成 lalamu | http://lalamu.studio/demo/ 画质增强 topazlabs video AI | https://www.topazlabs.com 付费工具 rask | htt…

重生奇迹mu转职任务详解

重生奇迹mu神骑士怎么转 神骑士是一种转职类型&#xff0c;需要你的角色达到一定等级以及完成相应任务方可转职。以下是神骑士转职的具体步骤&#xff1a; 1.等级要求&#xff1a;首先&#xff0c;你的角色需要达到150级才能进行神骑士转职任务。 2.神骑士转职任务&#xff…

hyperledger fabric2.4测试网络添加组织数量

!!!修改内容比较繁琐,预期未来提供模板修改 修改初始配置文件,初始添加3个组织 organizations文件夹 /cryptogen文件夹下创建文件crypto-config-org3.yaml,内容如下: PeerOrgs:# ---------------------------------------------------------------------------# Org3# ----…

获取每个部门中当前员工薪水最高的相关信息

个人网站 首发于公众号小肖学数据分析 描述 有一个员工表dept_emp简况如下: 有一个薪水表salaries简况如下: 获取每个部门中当前员工薪水最高的相关信息&#xff0c;给出dept_no, emp_no以及其对应的salary&#xff0c;按照部门编号dept_no升序排列&#xff0c;以上例子输出…