使用 EMQX 开源版的 Webhook 机制处理消息并存储数据

news2025/1/9 16:12:52

1、前言

EMQX 是一款强大的开源 MQTT 消息代理,它支持大量的连接和高吞吐量,适用于各种物联网应用。Webhook 是 EMQX 提供的扩展功能之一,用于将消息推送到外部的 HTTP 服务。在本文中,我们将介绍如何使用 EMQX 开源版的 Webhook 机制,并展示如何处理收到的 Webhook 请求,将其中的数据存储到数据库中。

2、Webhook 简介

Webhook 是一种常见的 HTTP 回调机制,用于将事件或数据推送到外部服务器。当 MQTT 客户端发布消息时,EMQX 可以通过 Webhook 将该消息发送给指定的 HTTP 端点,方便我们在接收到消息后进一步处理数据。

3、搭建 Webhook 服务

接下来,我们编写一个简单的 SpringBoot 2.7服务,用于接收 EMQX 的 Webhook 请求并将其中的数据存储到数据库中。

3.1、项目依赖

pom.xml 中添加以下依赖:

    <dependencies>
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.13.5</version>
        </dependency>

        <!-- Jackson Databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.5}</version>
        </dependency>

        <!-- Jackson Annotations -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.13.5</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.6</version>
        </dependency>
    </dependencies>  

3.2、实现 Webhook 控制器

3.2.1、Controller
package ....这里填写你自己的

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.util.Map;

@RestController
@RequestMapping("/emqx/test")
@AllArgsConstructor
@Slf4j
public class WebhookController {

    private final EmqxTestService emqxTestService;
    private final ObjectMapper objectMapper = new ObjectMapper();


    @PostMapping("/webhook")
    public String webhook(@RequestBody String payload) {
        try {
            // 解析主 JSON 字符串为 Map
            Map<String, Object> payloadMap = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {});

            // 从主 Map 中提取 clientid 和 topic
            String clientId = (String) payloadMap.get("clientid");
            String topic = (String) payloadMap.get("topic");

            log.info("Received clientid: {}", clientId);
            log.info("Received topic: {}", topic);

            // 提取 payload 字段的 JSON 字符串
            String payloadString = (String) payloadMap.get("payload");

            // 解析 payload 字段的 JSON 字符串为 Map
            Map<String, Object> payloadDataMap = objectMapper.readValue(payloadString, new TypeReference<Map<String, Object>>() {});

            // 从 payload 数据中提取 msg 参数
            String msg = (String) payloadDataMap.get("msg");

            log.info("Received msg: {}", msg);

            // 创建 EmqxTest 实例并设置字段
            EmqxTest testData = new EmqxTest();
            testData.setData(payload);
            testData.setClientId(clientId);
            testData.setTopic(topic);

            // 保存数据
            emqxTestService.insertData(testData);

        } catch (IOException e) {
            log.error("解析JSON有效负载失败", e);
            return "Error parsing payload";
        }

        return "Received";
    }
}
 3.2.2、Service
package ....这里填写你自己的

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ldb.tool.entity.EmqxTest;
import com.ldb.tool.mapper.EmqxTestMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

@Service
@AllArgsConstructor
@Slf4j
public class EmqxTestService {

    private final EmqxTestMapper emqxTestMapper;

    public EmqxTest insertData(EmqxTest testData) {
        EmqxTest emqxTest = new EmqxTest();
        // 你可以手动设置其他需要的字段,如 clientId, topic, data 等
        emqxTest.setClientId(testData.getClientId());
        emqxTest.setTopic(testData.getTopic());
        emqxTest.setData(testData.getData());
        emqxTest.setCreateTime(new Date()); // 如果你有自动填充策略,可以忽略这行

        this.emqxTestMapper.insert(emqxTest);

        return emqxTest;
    }
}
3.2.3、Mapper
package ...这里填写你自己的;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ldb.tool.entity.EmqxTest;

public interface EmqxTestMapper extends BaseMapper<EmqxTest> {
}
 3.2.4、Entity
package ...这里填写你自己的;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@TableName("emqx_test")
@Data
public class EmqxTest implements Serializable {
    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private Long id;
    private String clientId;
    private String topic;
    private String data;
    private Date createTime;
    private Date updateTime;
}

4、配置 EMQX Webhook

4.1、运行

我们这里使用docker来运行EMQX。

通过 Docker 运行 EMQX | EMQX文档

4.1.1、获取镜像
docker pull emqx/emqx:5.8.0
4.1.2、启动容器

docker run -d --name emqx \
  -p 1883:1883 -p 8083:8083 \
  -p 8084:8084 -p 8883:8883 \
  -p 18083:18083 \
  -v $PWD/data:/opt/emqx/data \
  -v $PWD/log:/opt/emqx/log \
  emqx/emqx:5.8.0

4.2、配置EMQX-Webhook

4.2.1、创建Webhook

访问EMQX可视化后台(http://localhost:18083/)=>集成=>Webhook=>创建Webhook

在填写设置的时候,需要注意的是我们本地docke访问宿主机,在容器内部URL:127.0.0.1,指向的是容器本身,你可以获取宿主机IP作为URL,比如192.168.30.44。

我们通过URL选项的测试按钮可以点击测试是否正常请求。

5、测试 Webhook

在保证我们的Java-Webhook、EMQX服务运行的情况下,我们可以通过MQTTX(简介 - MQTTX 文档)软件去模拟一台直连的MQTT设备发起一个主题,因为我们在创建Webhook的时候触发者是消息发布。

5.1、MQTTX发送主题

首先我们需要新建一个MQTT连接,配置如下所以,未设置认证的话不需要用户名密码。

右下角,我们填写主题(Topic)的消息路由为listen/me,消息内容为{"msg": "send messgae","status":1},点击小飞机按钮发送。

5.2、查看Webhook触发情况

在EMQX后台,集成=>Webhook,查看送达情况。

在查看我们的Java服务的日志打印,也收到了。

查看sql表,也已经正常保存。

6、结论

Webhook 是一种强大的机制,MQTT 消息发布事件触发后,通过 HTTP 推送到 Spring Boot 服务,对接收到的数据进行解析和存储。这种机制能够让我们轻松地将消息从 EMQX 转发到其他服务,从而实现复杂的业务逻辑处理。


7、参考资料

  • EMQX官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2112872.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RabbitMQ 03 在项目中的实际使用: 告警,批处理

01.例子&#xff0c;解耦合&#xff08;使用异步&#xff09; 1.1异步思想&#xff1a;不会专门等待 1.2 例子&#xff1a;程序执行 1.3 如何设计程序 多线程&#xff1a; 订单请求模块只用于发送请求和处理确认&#xff0c;订单处理模块专门用于处理请求并且发送确认信…

金税四期工程运维:税务领域的信息化挑战与策略

在信息化浪潮的推动下&#xff0c;中国税务系统迎来了“金税四期”工程这一重大变革。作为税务信息化的新阶段&#xff0c;金税四期不仅标志着税务管理向更高效、更智能的方向迈进&#xff0c;同时也对运维工作提出了前所未有的挑战。本文将从金税四期的背景、运维需求分析、面…

101.对称二叉树

&#xff08;写给未来遗忘的自己&#xff09; 题目&#xff1a; 代码&#xff08;层次递归&#xff1a;&#xff09;&#xff1a; class Solution { public:bool isSymmetric(TreeNode* root) {std::queue<TreeNode*>lefttoright;std::queue<TreeNode*>righttol…

CSP-J基础之进制转换

文章目录 前言数制1. **二进制 (Binary)**2. **八进制 (Octal)**3. **十进制 (Decimal)**4. **十六进制 (Hexadecimal)** K进制转十进制例子 1&#xff1a;以二进制&#xff08;K 2&#xff09;为基数例子 2&#xff1a;以八进制&#xff08;K 8&#xff09;为基数例子 3&…

【MATLAB源码-第160期】基于matlab的胡桃夹子优化算法(NOA)无人机三维路径规划,输出做短路径图和适应度曲线

操作环境&#xff1a; MATLAB 2022a 1、算法描述 胡桃夹子优化算法&#xff08;Nutcracker Optimization Algorithm, NOA&#xff09;是一个灵感来源于胡桃夹子的故事的元启发式优化算法。这个故事中&#xff0c;胡桃夹子是一个能够将坚果壳轻易地破开以获取内部果仁的工具。…

滚珠花键助力生产加工精准化!

滚珠花键是一种机械传动元件&#xff0c;它通过花键轴与花键孔的配合&#xff0c;将动力从一个轴传递到另一个。在工业自动化领域内&#xff0c;滚珠花键系列产品主要用于辅助直线运动。尤其是在那些需要精密传动的应用场景&#xff0c;而滚珠花键在生产加工中的优势主要体现在…

Vulnhub:Dr4g0n b4ll 1

靶机下载地址 信息收集 主机发现 nmap扫描攻击机同网段存活主机。 nmap 192.168.31.0/24 -Pn -T4 靶机ip&#xff1a;192.168.31.183 端口扫描 nmap 192.168.31.183 -A -p- -T4 开放了22,80端口&#xff0c;端口详细信息如下&#xff1a; 网站信息收集 访问http服务。 …

WPF入门到跪下 第十一章 Prism(四)View与ViewModel的自动关联

View与ViewModel的自动关联 一、ViewModelLocator 在学习MvvmLight框架时&#xff0c;也使用了ViewModelLocator类。但在MvvmLight框架中&#xff0c;ViewModelLocator只是一个自定义类&#xff0c;与框架无关&#xff0c;目的就是初始化IOC容器。而在Prism框架中则不同&…

matplotlib中文乱码问题

在使用Matplotlib进行数据可视化的过程中&#xff0c;经常会遇到中文乱码的问题。显示乱码是由于编码问题导致的&#xff0c;而matplotlib 默认使用ASCII 编码&#xff0c;但是当使用pyplot时&#xff0c;是支持unicode编码的&#xff0c;只是默认字体是英文字体&#xff0c;导…

【LeetCode】08.字符串转换整数

题目要求 解题思路 本题没有难点&#xff0c;只需注意最大整数的比较时要切换成long long 代码实现 class Solution { public:int myAtoi(string s) {//标记正负号int flag1;long long ret0;int ns.size();int i0;//去除空格while(s[i] ) i;//识别符号if(s[i]-) flag-1;i…

vue项目打包后,生成的index.html直接本地打开后没内容

应该是文件路径找不到了 可以打开控制台看看 可以看到加载css&#xff0c;js&#xff0c;图标资源失败&#xff0c;所以是文件路径问题 vue-cli工程化生成的项目在打包后&#xff0c;默认的资源寻找路径是根&#xff0c;所以可以看到它直接在/F;/favicon找图标&#xff0c;但…

机器学习模型中的因果关系:引入单调约束

单调约束是使机器学习模型可行的关键&#xff0c;但它们仍未被广泛使用欢迎来到雲闪世界。 碳ausality 正在迅速成为每个数据科学家工具包中必不可少的组成部分。 这是有充分理由的。 事实上&#xff0c;因果模型在商业中具有很高的价值&#xff0c;因为它们为“假设”情景提…

经典文献阅读之--WidthFormer(基于Transformer的BEV方案量产方案)

0. 简介 《WidthFormer: Toward Efficient Transformer-based BEV View Transformation》提出了WidthFormer&#xff0c;这是一种基于Transformer的新颖鸟瞰视角&#xff08;Birds-Eye-View, BEV&#xff09;三维检测方法&#xff0c;专为实时自动驾驶应用而设计。WidthFormer…

网络安全 day5 --- 反弹SHELL不回显带外正反向连接防火墙出入站文件下载

免责声明 本免责声明适用于作者所有文章内容。使用者需明确&#xff0c;网络安全技术仅供学习和合法研究使用&#xff0c;不得用于任何非法活动&#xff0c;如未经授权的入侵、攻击或数据窃取&#xff0c;所有相关法律责任由使用者自行承担。由于网络安全操作可能带来系统崩溃、…

程序中的零值比较

前言&#xff1a;什么是零值&#xff1f; 在C/C中&#xff0c;“零值”通常指的是数值类型的零&#xff08;0&#xff09;&#xff0c;对于指针来说则是空指针&#xff08;nullptr 或 NULL&#xff09;。下面我们将分别讨论如何比较整型、字符、浮点数和指针与“零值”的比较。…

顺序表与链表练习

目录 1.在长度为n(n > 1)的单链表上&#xff0c;设有头和尾两个引用&#xff0c;执行( )操作与链表的长度有关。 2.下列关于链表的说法那个是正确的( ) 3. 关于链表和顺序表间的区别&#xff0c;叙述错误的是&#xff08; &#xff09; 4.在长度为 n 的顺序表下标为 i…

oatpp apiclient 客户端get,post请求python fastapi demo

最新用fastapi搞了个服务端,python功能太强了,就是环境不好弄,弄好后,不要轻易换python版本,不要装多个python版本 前面搞了个oatpp webapi服务端,现在要用客户端,为什么用opatpp客户端,因为他不再带其他库了 demo: 我的请求比较简单,就是向python 的 fastapi服务端…

CSP-J基础之常见的竞赛题库

文章目录 CSP-J基础之常见的竞赛题库1. 可达 (KEDA)2. 洛谷 (Luogu)3. Codeforces 洛谷账号的注册总结 CSP-J基础之常见的竞赛题库 在备战CSP-J&#xff08;Certified Software Professional Junior&#xff09;及其他信息学竞赛时&#xff0c;选手们常需要借助在线题库来进行…

GIS圈大事件!Cesium被收购了,是好是坏?

大家好&#xff0c;我是日拱一卒的攻城师不浪&#xff0c;致力于技术与艺术的融合。这是2024年输出的第34/100篇文章。 Cesium开发交流群V&#xff1a;brown_7778&#xff08;备注来意&#xff09; 一觉醒来&#xff0c;突然看到Cesium官方发的消息&#xff0c;宣布通过收购的方…

第十三届山东省ICPC

vp链接&#xff1a;https://codeforces.com/gym/104417 A. Orders 根据题意模拟&#xff0c;分别按照 a&#xff0c;b 排序&#xff0c;排序后再判断该订单是否能完成。 #include <bits/stdc.h> using namespace std;#define int long longconst int N 105; int n, k…