《flask》flask+mqtt联动快速上手

news2024/11/18 2:56:10

简介

本文旨在介绍如何快速上手联动flask + mqtt,本文将会给出一个简单的demo,用于演示在如何通过访问flask接口来触发mqtt,并在flask运行的基础的上对mqtt进行订阅。

快速上手

因为有项目需求,所以需要flask + mqtt进行联动,因为需要一直开启监听,所以需要一直挂在一个线程上,一开始想到用多线程做,或者说用异步协程来做,后来发现有一个关于flask的mqtt扩展库,因此为了快速上手,直接用该库进行开发,可以节省很多精力。

link: https://github.com/stlehmann/Flask-MQTT

首先,我们需要安装flask-mqtt库。

pip install flask-mqtt

下面放上一个简单的示例代码,里面包括了MQTT的订阅、发布,flask的访问。

"""  
A small Test application to show how to use Flask-MQTT.  
"""  
  
import eventlet  
from flask import Flask, render_template  
from flask_mqtt import Mqtt  
  
  
eventlet.monkey_patch()  
  
app = Flask(__name__)  
app.config['SECRET'] = 'my secret key'  
app.config['TEMPLATES_AUTO_RELOAD'] = True  
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'  
app.config['MQTT_BROKER_PORT'] = 1883  
app.config['MQTT_USERNAME'] = ''  
app.config['MQTT_PASSWORD'] = ''  
app.config['MQTT_KEEPALIVE'] = 5  
app.config['MQTT_TLS_ENABLED'] = False  
app.config['MQTT_CLEAN_SESSION'] = True  
  
  
mqtt = Mqtt(app)  
  
  
@app.route('/')  
def index():  
    return "hello mqtt_flask"  
  
@app.route('/hello')  
def hello():  
    mqtt.publish('hello', 'hello, this is flask')  
    print("[mqtt] publish successfully")  
    return "publish successfully"  
  
@mqtt.on_connect()  
def handle_connect(client, userdata, flags, rc):  
    mqtt.subscribe('hello')  
    print("[mqtt] has listen topic hello")  
  
  
@mqtt.on_message()  
def handle_mqtt_message(client, userdata, message):  
    data = dict(  
        topic=message.topic,  
        payload=message.payload.decode()  
    )  
    print(data)  
  
if __name__ == '__main__':  
    # important: Do not use reloader because this will create two Flask instances.  
    # Flask-MQTT only supports running with one instance    # socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=False)    app.run()

订阅测试

这里用了MQTTX工具来进行测试,现在我已经连接到了一个MQTT服务器上,我将用MQTTX向该服务器发送一个topic为hello,payload为{"msg": "Hello, This is mqtt server"}的消息。

运行程序后,MQTTX发送消息,结果如下,可以看到,接收正常。

发布测试

现在我们访问http://localhost:5000/hello,flask服务器将会向我的MQTT服务器发送一个topic为hello,payload为"hello, this is flask"的消息,我们用MQTTX来监听。

从下图中可以看到,MQTTX成功订阅接收到了该消息,至此,关于FLASK+MQTT的必要流程已经可以走通了。

进一步测试

最初,笔者以为启动mqtt_client.loop_start()函数之后会线程堵塞,但是在使用 Paho MQTT Python 库时,调用 client.loop_start() 方法会在后台启动一个新线程,用于处理 MQTT 客户端的事件循环。这意味着您的代码不会堵塞,并且可以在 MQTT 客户端的事件循环运行时继续执行。

例如,您可以在调用 client.loop_start() 后使用以下代码继续执行其他操作:

import paho.mqtt.client as mqtt

# The callback for when a message is received from the server
def on_message(client, userdata, message):
    print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'")

# Create an MQTT client
client = mqtt.Client()

# Set the on_message callback
client.on_message = on_message

# Connect to the MQTT server
client.connect("mqtt.example.com", 1883, 60)

# Subscribe to a topic
client.subscribe("my/topic")

# Start the MQTT client loop in the background
client.loop_start()

# Do other work here
print("Doing other work")

# Wait for a while
time.sleep(10)

# Stop the MQTT client loop
client.loop_stop()

在上面的代码中,我们在调用 client.loop_start() 后立即执行了其他代码,然后等待了 10 秒钟。在这期间,MQTT 客户端的事件循环仍在后台运行,并等待接收消息。

基于此,那么flask和mqtt联动其实不用上述的扩展库也可以,笔者在paho官方的demo上小改一下,最后结果如下所示:

"""
访问 localhost:5000/hello时,会用mqtt客户端发布主题消息
"""
from flask import Flask, jsonify  
import paho.mqtt.client as mqtt  
import sys  
  
  
app = Flask(__name__)  
print('[app] start work', file=sys.stdout)  
  
  
def connect_mqtt() -> mqtt:  
    def on_connect(client, userdata, flags, rc):  
        if rc == 0:  
            print("Connected to MQTT Broker!", file=sys.stdout)  
        else:  
            print("Failed to connect, return code %d\n", rc, file=sys.stdout)  
  
    client = mqtt.Client()  
    client.username_pw_set("", "")  
    client.on_connect = on_connect  
    client.connect("broker.emqx.io", 1883, 60)  
    return client  
  
  
def subscribe(client: mqtt, topic):  
    def on_message(client, userdata, msg):  
        print(f"[mqtt] Received `{msg.payload.decode()}` from `{msg.topic}` topic", file=sys.stdout)  
  
    client.on_message = on_message  
    client.subscribe(topic)  
    print("[mqtt] subscribe topic")  
  
  
client = connect_mqtt()  
subscribe(client, "hello")  
client.loop_start()
  
@app.route("/hello")  
def alarm():  
    client.publish("hello", "important msg")  
    print("[mqtt] send msg successfully", file=sys.stdout)  
    return "Mqtt message published"  
  
  
if __name__ == '__main__':  
    app.run()

该代码经本人实际测试,也是可以正常食用的。需要说明的是,如果你只是需要发布而不需要监听,那么client.loop_start()不是必要的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/125445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker网络中篇-docker网络的四种类型

通过上一篇学习,我们对docker网络有了初步的了解。本篇,咱们就来实战docker网络。 docker网络实战 实战docker网络,我们将从以下几个案例来讲解 1:birdge是什么? 2:host 3:none 4:container 实战网络类型如下: 在docker中,网络的配置是以json格式存在的,下面…

知识变现创业者必读——《知识变现实操手册》

现在越来越多人,正在跑步进入知识变现创业这个赛道。 为什么进入这个赛道,因为能赚钱钱啊,大部分人是受到了知识变现大咖们日入万元,月入十万,这些赚钱效益的刺激,匆忙进入的。 我问一句,你知识…

网络路由技术和协议

网络路由是网络通信的重要组成部分。路由可帮助您的网络组件从可用选项中选择最佳网络路径。这使得网络通信高效可靠。启用此功能的硬件组件称为路由器。监控和管理路由器是网络管理员日常工作中不可或缺的一部分。由于路由器可以决定网络连接和可用性的成败,因此了…

MATLAB-plot绘图函数

plot函数是MATLAB中最核心的二维绘图函数,它有多种语法格式可以实现多种功能。 plot函数的基本调用格式如下。 plot(y) 当y为向量时,是以y的分量为纵坐标、元素序号为横坐标,用直线依次连接数据点, 绘制曲线。若y为实矩阵&#…

【技术分享】如何实现功能完备性能优异的RTMP、RTSP播放器?

技术背景 这几年,我们对接了太多有RTSP或RTMP直播播放器诉求的开发者,他们当中除了寻求完整的解决方案的,还有些是技术探讨,希望能借鉴我们播放端的开发思路或功能特性,完善自己的产品。 忙里偷闲,今天我…

【GO】K8s 管理系统项目[API部分--Pv]

K8s 管理系统项目[API部分–Pv] 1. 接口实现 service/dataselector.go type pvCell corev1.PersistentVolumefunc(p pvCell) GetCreation() time.Time {return p.CreationTimestamp.Time }func(p pvCell) GetName() string {return p.Name }2. Pv功能 service/pv.go 2.1 获…

沙龙预告|2023 年展望 Web3 Crypto

全长 621 字,预计阅读 3 分钟 作者:MiX 起起落落的2022年即将结束,随着传统金融机构的采用和 Web3 创新的不断深入,加密领域已经成为全球资本和技术创新的重要组成部分。 总结2022,展望2023,这对每一位加…

【数据库与缓存保持一致性】

文章目录1. 方案1先更新数据库,再更新缓存先更新缓存,在更新数据库2. 方案2先更新数据库,在删缓存先删缓存,在更新数据库3. 方案3—如何保证两个操作都能执行成功?重试机制订阅 MySQL binlog1. 方案1 先更新数据库&am…

S7-1200PLC与组态王进行TCP通信的基本方法和步骤

S7-1200PLC与组态王进行TCP通信的基本方法和步骤 如下图所示,打开博途软件,新建一个项目,设置该PLC的IP地址为:192.168.1.102, 如下图所示,在OB1中编写一段简单的启保停程序, 如下图所示,打开Kingview组态王软件,点击文件—新建工程, 新建一个项目,如下图所示…

化工企业安全风险管控数字化解决方案

当前我国化工行业的基础能力缺失问题非常严重。由于一些共性技术的缺失,给以化工行业为代表的关键基础工业的产业安全带来诸多隐患。粗放式发展 带来的环保安全问题,不仅造成了巨大的资源浪费和社会成本的增加,同 时也使整个产业的发展环境恶…

SpringBoot+Vue前后端分离项目搭建

好久没写文章了!!! 企业级项目,开袋即食。扩展、修改起来非常方便 系统基本功能 用户管理:提供用户的相关配置,新增用户后,默认密码为:Pass_123角色管理:对权限与菜单…

基于注解的AOP之切入点表达式的语法和获取连接点的信息以及切入点表达式的重用

基于注解的AOP之切入点表达式的语法和获取连接点的信息以及切入点表达式的重用 1.切入点表达式的语法 ①作用 ②语法细节 用*号代替“权限修饰符”和“返回值”部分表示“权限修饰符”和“返回值”不限 在包名的部分,一个“*”号只能代表包的层次结构中的一层&…

Vue3+TS+Vant3——增删改input和通过双页面进行增删改操作

Vue3TSVant3——增删改input和通过双页面进行增删改操作 两种方案: 第一种点击添加按钮添加一项,缺点:页面过于臃肿,用户体验较差 第二种:分成两种页面进行添加等操作 先说一下第一种,我这里用到了va…

Linux下Qt程序用qBreakpad定位崩溃位置

目录1. 使用qBreakpad1.1. 下载1.2. 编译1.3. 使用2. 使用dump文件2.1. 编译breakpad2.2. 解析dmp文件生成sym文件2.3. 解析dmp可能遇到的问题问题一Qt程序的release版本交付给用户或者测试后,如果出现崩溃,很多时候都比较难重现,如果程序能自…

赛卓电子冲刺科创板上市:计划募资11亿元,股东包括尚颀资本等

12月28日,赛卓电子科技(上海)股份有限公司(下称“赛卓电子”)在上海证券交易所递交招股书,准备在科创板上市。本次冲刺上市,赛卓电子计划募资11亿元,将用于车规级芯片研发及产业化项…

高级网络复习——防火墙,OSPF协议,rip协议,三层,DHCP中继知识题解(带答案)

作者简介:一名在校云计算网络运维学生、每天分享网络运维的学习经验、和学习笔记。 座右铭:低头赶路,敬事如仪 个人主页:网络豆的主页​​​​​​ 目录 前言 简答题 二.选择题 前言 本章讲解讲解防火墙,OSPF协…

java多线程(9):线程状态

1 五大状态 2 线程方法 3 停止线程 不推荐使用JDK提供的 stop()、 destroy()方法。【已废弃】推荐线程自己停止下来建议使用一个标志位进行终止变量 ,当flagfalse,则终止线程运行。package xiong.demo3;public class TestStop implements Runnable{//1.…

从DTCC2022盘点各大厂商如何看待数据库发展趋势

DTCC 2022 已落下帷幕有些时日,回顾大会上的一些分享,尤其是头一天上午几大数据库厂商均在演讲一开始纷纷表达了对数据库发展趋势的看法。从各个厂商的观点来看,对数据库发展趋势基本属于是不谋而合,未来数据库的样子离不开以下这…

MATLAB形态学的基本运算膨胀和腐蚀(开、闭运算)

形态学中两种基本的操作是膨胀和腐蚀,膨胀是指在图像中为其边界添加像素点,而腐蚀是其逆过程。对应的添加和移除像素点数依赖于处理图像结构元素矩阵的大小和形式。 一.膨胀处理 膨胀的运算符为⊕, A用B来膨胀写作A⊕B,MATLAB中提供了预定义…

小程序-小程序框架

目录 一,框架简介 响应的数据绑定 二,视图层 View WXML (html) WXSS 样式 JS 逻辑交互 三,事件 什么是事件 事件的使用方式 四,逻辑层 APP service 1,生命周期 生命周期演示 页面路…