需求:节目上传至MINIO后,使用mqtt进行上报
环境准备
-
文件管理平台:首先需要使用minio搭建属于自己的对象存储(此步骤跳过)
-
通信方式:MQTT方式,客户端测试工具:MQTTX(https://mqttx.app/)
-
在minio平台上创建桶
-
为了连接桶,需要添加Access Keys,填写自己的Access Key 和 Secret Key
-
在minio平台上添加Events,然后让创建的桶订阅此话题,这个话题也是我们实现功能的关键步骤
Events->点击Add Event Destination->选择MQTT->填写相应信息,
注意:写Broker时,协议写成tcp,http不ok,保存事件后提示重新登陆,使配置有效
-
找到东订阅刚刚的event,效果图如下。
到此处,minio中的配置已经完成,此时就可以用代码实现:当桶中的添加入新的文件时,可以使用mqtt通信将信息进行上报。(这里我应该只选择put的,因为只对添加文件进行监视) -
python实现文件上传minio
from minio import Minio # 存储文件到桶对象中或者存储在桶下的某个文件夹下 def save_file(bucket, file_name, file_local_path): minio_client = Minio( "{0}:{1}".format("10.199.130.173", "9000"), secure=False, # 默认True[https] access_key="在minio中设置的access_key", secret_key="在minio中设置的secret_key", ) minio_client.fput_object(bucket, file_name, file_local_path) print("上传成功!!!") if __name__ == "__main__": # 存储图片文件到桶的主目录下 save_file("桶名", "目标文件:eg:/pre_no.png", "本地文件地址") # # 存储图片文件到桶下的某个文件夹下 # save_file("lg2", "/jpeg/pre_no.png", "D:\\Postgraduate\\MqttProject\\no_label\\1.png")
订阅minio创建的话题,当桶信息发生变化时,将结果发送给我们,可以根据收到的信息,判断,新节目上传到了桶上,我们这边发布话题,通知服务器端
#!/usr/bin/env python # 实现文件上传成功后,通过订阅上传成功的话题,会返回信息, # 然后需要我向broker publish一个topic,告知新节目上传到minio了 from __future__ import print_function import json import paho.mqtt.client as mqtt # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): print("Connected with result code", rc) client.subscribe("minio设置的话题") def on_message(client, userdata, msg): if msg.retain == 1: print("此消息是保留信息") else: message_json = json.dumps({ 'topic': msg.topic, 'payload': msg.payload.decode(), 'qos': msg.qos, 'retain': msg.retain },sort_keys=True, indent=4, separators=(',', ': ')) print(message_json) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("broker.emqx.io", 1883) client.loop_forever()