EMQX webhook消息转发Web服务器
- 一、前言
- 二、实现
- 1、EMQX服务器搭建
- EMQX下载、安装、启动
- 2、本地Web服务搭建
- 创建Flask项目
- 代码
- 3、EMQX中创建webhook数据桥接
- 4、EMQX中创建数据转发规则
- 三、效果
一、前言
需求:获取设备通过mqtt协议发送过来的数据并将数据保存到外部服务中,期间需要使用EMQX代理服务器将消息转发到自有的Web服务中
实现:通过EMQX中的webhook实现消息转发
官方:https://www.emqx.io/docs/zh/v5.0/data-integration/data-bridge-webhook.html
二、实现
1、EMQX服务器搭建
EMQX下载、安装、启动
- 到EMQX官网进行下载:https://www.emqx.io/zh/downloads?os=Windows
- 安装运行完成后,可直接访问 EMQX Dashboard 管理控制台
http://localhost:18083/
(localhost
根据实际IP地址修改)
- 默认用户名及密码
- admin / public
- 停止EMQX服务
- 打开cmd,进入到emqx所在文件夹中的
bin
目录 - 输入一下指令
./emqx/bin/emqx stop
- 打开cmd,进入到emqx所在文件夹中的
2、本地Web服务搭建
本次Web服务器使用Python Flask进行搭建
创建Flask项目
- 相关功能的代码实现直接在
app.py
中完成
代码
本次将 mqtt客户端 和 接收EMQX转发的消息数据 都写在该Flask项目中
import json
from flask import request, jsonify, Flask, Blueprint, render_template, session, current_app
from flask_mqtt import Mqtt
from werkzeug.local import LocalProxy
app = Flask(__name__)
# 代理地址(根据实际使用的IP地址进行修改,需要和EMQX处于同一地址)
app.config['MQTT_BROKER_URL'] = '127.0.0.1'
# 端口
app.config['MQTT_BROKER_PORT'] = 1883
# 当需要验证用户名和密码时,设置该项(根据实际情况设定)
# app.config['MQTT_USERNAME'] = 'user'
# 当需要验证用户名和密码时,设置该项(根据实际情况设定)
# app.config['MQTT_PASSWORD'] = '123456'
# 设置心跳时间,单位为秒
app.config['MQTT_KEEPALIVE'] = 60
# 如果服务器支持 TLS,则设置为 True
app.config['MQTT_TLS_ENABLED'] = False
# 主题(根据实际情况设定)
topic = 't/1'
# 实例化
mqtt_client = Mqtt(app)
@app.route('/')
def index():
# 初始路由
return render_template('index.html')
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
print('Connected successfully')
# 订阅主题
mqtt_client.subscribe(topic)
else:
# 连接失败
print('Bad connection. Code:', rc)
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
""" 消息回调函数 """
# 定义接受到的消息
data = dict(
# 主题
topic=message.topic,
# 内容
payload=message.payload.decode()
)
print(data)
# 打印输出接收到的消息
print('Received message on topic: {topic} with payload: {payload}'.format(**data))
@app.route('/publish', methods=['POST'])
def publish_message():
"""
消息发布接口(实际应用中,该接口可能需要处理一些复杂业务逻辑)
"""
# 获取前端页面提交的数据,并格式化
request_data = request.get_json()
# print("接收到的数据", request_data)
# 发布消息
publish_result = mqtt_client.publish(request_data['topic'], request_data['payload'])
# 返回JSON数据
return jsonify({'code': publish_result[0]})
@app.route('/emqx', methods=['POST'])
def test_emqx_conn():
"""
测试 搭建简易EMQX HTTP服务(用于接收EMQX转发过来的消息)
在后面的 webhook数据桥接 创建中,URL填写为:http://127.0.0.1:5000/emqx
"""
# 响应
reply = {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200
if __name__ == '__main__':
app.debug = True
app.run()
3、EMQX中创建webhook数据桥接
- EMQX控制台中找到
webhook
,点击创建
- 输入数据桥接名称(要求是大小写英文字母和数字的组合)
- 请求方法选择 POST,
- URL 为 http://127.0.0.1:5000/emqx(根据实际使用填写)
- 其他使用默认值
- 点击最下方保存按钮完成规则创建。
4、EMQX中创建数据转发规则
- 创建好webhook后,会自动根据创建的webhook桥接生成一个规则
- 直接点击生成的规则中的 设置
SQL编辑器
根据个人实际业务进行修改,修改完成后直接点击更新
三、效果