简介
本文旨在介绍如何快速上手联动flask + mqtt,本文将会给出一个简单的demo,用于演示在如何通过访问flask接口来触发mqtt,并在flask运行的基础的上对mqtt进行订阅。
快速上手
因为有项目需求,所以需要flask + mqtt进行联动,因为需要一直开启监听,所以需要一直挂在一个线程上,一开始想到用多线程做,或者说用异步协程来做,后来发现有一个关于flask的mqtt扩展库,因此为了快速上手,直接用该库进行开发,可以节省很多精力。
link: https://github.com/stlehmann/Flask-MQTT
首先,我们需要安装flask-mqtt库。
pip install flask-mqtt
下面放上一个简单的示例代码,里面包括了MQTT的订阅、发布,flask的访问。
"""
A small Test application to show how to use Flask-MQTT.
"""
import eventlet
from flask import Flask, render_template
from flask_mqtt import Mqtt
eventlet.monkey_patch()
app = Flask(__name__)
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 5
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_CLEAN_SESSION'] = True
mqtt = Mqtt(app)
@app.route('/')
def index():
return "hello mqtt_flask"
@app.route('/hello')
def hello():
mqtt.publish('hello', 'hello, this is flask')
print("[mqtt] publish successfully")
return "publish successfully"
@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc):
mqtt.subscribe('hello')
print("[mqtt] has listen topic hello")
@mqtt.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
print(data)
if __name__ == '__main__':
# important: Do not use reloader because this will create two Flask instances.
# Flask-MQTT only supports running with one instance # socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=False) app.run()
订阅测试
这里用了MQTTX工具来进行测试,现在我已经连接到了一个MQTT服务器上,我将用MQTTX向该服务器发送一个topic为hello
,payload为{"msg": "Hello, This is mqtt server"}
的消息。
运行程序后,MQTTX发送消息,结果如下,可以看到,接收正常。
发布测试
现在我们访问http://localhost:5000/hello
,flask服务器将会向我的MQTT服务器发送一个topic为hello
,payload为"hello, this is flask"
的消息,我们用MQTTX来监听。
从下图中可以看到,MQTTX成功订阅接收到了该消息,至此,关于FLASK+MQTT的必要流程已经可以走通了。
进一步测试
最初,笔者以为启动mqtt_client.loop_start()
函数之后会线程堵塞,但是在使用 Paho MQTT Python 库时,调用 client.loop_start()
方法会在后台启动一个新线程,用于处理 MQTT 客户端的事件循环。这意味着您的代码不会堵塞,并且可以在 MQTT 客户端的事件循环运行时继续执行。
例如,您可以在调用 client.loop_start()
后使用以下代码继续执行其他操作:
import paho.mqtt.client as mqtt
# The callback for when a message is received from the server
def on_message(client, userdata, message):
print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'")
# Create an MQTT client
client = mqtt.Client()
# Set the on_message callback
client.on_message = on_message
# Connect to the MQTT server
client.connect("mqtt.example.com", 1883, 60)
# Subscribe to a topic
client.subscribe("my/topic")
# Start the MQTT client loop in the background
client.loop_start()
# Do other work here
print("Doing other work")
# Wait for a while
time.sleep(10)
# Stop the MQTT client loop
client.loop_stop()
在上面的代码中,我们在调用 client.loop_start()
后立即执行了其他代码,然后等待了 10 秒钟。在这期间,MQTT 客户端的事件循环仍在后台运行,并等待接收消息。
基于此,那么flask和mqtt联动其实不用上述的扩展库也可以,笔者在paho官方的demo上小改一下,最后结果如下所示:
"""
访问 localhost:5000/hello时,会用mqtt客户端发布主题消息
"""
from flask import Flask, jsonify
import paho.mqtt.client as mqtt
import sys
app = Flask(__name__)
print('[app] start work', file=sys.stdout)
def connect_mqtt() -> mqtt:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!", file=sys.stdout)
else:
print("Failed to connect, return code %d\n", rc, file=sys.stdout)
client = mqtt.Client()
client.username_pw_set("", "")
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
return client
def subscribe(client: mqtt, topic):
def on_message(client, userdata, msg):
print(f"[mqtt] Received `{msg.payload.decode()}` from `{msg.topic}` topic", file=sys.stdout)
client.on_message = on_message
client.subscribe(topic)
print("[mqtt] subscribe topic")
client = connect_mqtt()
subscribe(client, "hello")
client.loop_start()
@app.route("/hello")
def alarm():
client.publish("hello", "important msg")
print("[mqtt] send msg successfully", file=sys.stdout)
return "Mqtt message published"
if __name__ == '__main__':
app.run()
该代码经本人实际测试,也是可以正常食用的。需要说明的是,如果你只是需要发布而不需要监听,那么client.loop_start()
不是必要的。