Python使用MQTT连接新版ONENet

news2024/11/13 9:10:07

Python MQTT 连接新版ONENet

简介

前几个教程我们使用mqtt.fx连接了新版的ONENet, 只是跑通了MQTT协议,但是在实际操作下还需要实现具体环境、具体设备的MQTT连接,本章教程将以Python MQTT的方式连接 ONENet

参考文档:

paho-mqtt · PyPI

OneNET - 中国移动物联网开放平台 (10086.cn)

准备环境

pip安装 paho-mqtt

pip install paho-mqtt

获取ONENet 三元组

准备好Onenet的三元组
在这里插入图片描述

三元组分别为

DeviceName=“wenshidap” #设备ID

Productid = “kuerSLKlo8” #产品ID

accesskey=“QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk=”#秘钥

根据ONENet手册的文档说明,mqtt连接onenet需要进行鉴权,可以访问链接查看鉴权算法,但是我们可以根据鉴权算法说明生成鉴权的秘钥

使用Pyhon生成的鉴权秘钥的函数为

#认证token生成函数
def get_token(id,access_key):

    version = '2018-10-31'
 #   res = 'products/%s' % id  # 通过产品ID访问产品API
    # res = 'userid/%s' % id  # 通过产品ID访问产品API
    res="products/"+ Productid + "/devices/" + DeviceName
    # 用户自定义token过期时间
    et = str(int(time.time()) + 36000000)
    # et = str(int(1722499200))
    # 签名方法,支持md5、sha1、sha256
    method = 'sha1'
    method1 = 'sha256'
    # 对access_key进行decode
    key = base64.b64decode(access_key)

# 计算sign
    org = et + '\n' + method+ '\n' + res + '\n' + version
    sign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)
    sign = base64.b64encode(sign_b.digest()).decode()

# value 部分进行url编码,method/res/version值较为简单无需编码
    sign = quote(sign, safe='')
    res = quote(res, safe='')

# token参数拼接
    token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)

    return token

MQTT订阅和发布Topic说明

我们可以点击产品开发->设备开发->topic管理->数据流topic来查看当前设备可以订阅哪些topic,在这里我们是上传数据流,所以我们只关心发布的topic 和订阅上传成功和上传失败的topic

在这里插入图片描述

发布的topic :$sys/kuerSLKlo8/{device-name}/dp/post/json

只需要将我们的device-name换成当前的devicename即可 在本项目就是wenshidap

即:$sys/kuerSLKlo8/wenshidap/dp/post/json

订阅的topic:

上传成功:$sys/kuerSLKlo8/{device-name}/dp/post/json/accepted

上传失败:$sys/kuerSLKlo8/{device-name}/dp/post/json/rejected

当我们订阅上传成功topic时,数据流上传成功后就会返回msg的id 失败时 reject topic就会返回失败的原因

MQTT连接ONENet主程序

import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311
import struct
import json
import base64
import hmac
import time
from urllib.parse import quote

ServerUrl = "mqtts.heclouds.com" #服务器url
ServerPort = 1883#服务器端口
DeviceName="wenshidap" #设备ID
Productid = "kuerSLKlo8" #产品ID
accesskey="QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk="

# 发布的topic
Pub_topic1 = "$sys/"+Productid+"/"+ DeviceName+"/dp/post/json"

#需要订阅的topic
#数据上传成功的消息
Sub_topic1 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/accepted"
#接收数据上传失败的消息
Sub_topic2 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/rejected"

#测试用json数据格式
jsonstr = "{\"id\": 123,\"dp\": {\"ConEnv_Temp\": [{\"v\": 22.1}],\"ConEnv_Humi\": [{\"v\": 61.2}]}}"


#认证token生成函数
def get_token(id,access_key):

    version = '2018-10-31'
 #   res = 'products/%s' % id  # 通过产品ID访问产品API
    # res = 'userid/%s' % id  # 通过产品ID访问产品API
    res="products/"+ Productid + "/devices/" + DeviceName
    # 用户自定义token过期时间
    et = str(int(time.time()) + 36000000)
    # et = str(int(1722499200))
    # 签名方法,支持md5、sha1、sha256
    method = 'sha1'
    method1 = 'sha256'
    # 对access_key进行decode
    key = base64.b64decode(access_key)

# 计算sign
    org = et + '\n' + method+ '\n' + res + '\n' + version
    sign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)
    sign = base64.b64encode(sign_b.digest()).decode()

# value 部分进行url编码,method/res/version值较为简单无需编码
    sign = quote(sign, safe='')
    res = quote(res, safe='')

# token参数拼接
    token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)

    return token

def on_subscribe(client, userdata, mid, reason_code_list, properties):
    # Since we subscribed only for a single channel, reason_code_list contains
    # a single entry
    if reason_code_list[0].is_failure:
        print(f"Broker rejected you subscription: {reason_code_list[0]}")
    else:
        print(f"Broker granted the following QoS: {reason_code_list[0].value}")


def on_unsubscribe(client, userdata, mid, reason_code_list, properties):
    # Be careful, the reason_code_list is only present in MQTTv5.
    # In MQTTv3 it will always be empty
    if len(reason_code_list) == 0 or not reason_code_list[0].is_failure:
        print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")
    else:
        print(f"Broker replied with failure: {reason_code_list[0]}")
    client.disconnect()

# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code.is_failure:
        print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
    else:
        # we should always subscribe from on_connect callback to be sure
        # our subscribed is persisted across reconnections.
        # client.subscribe("$SYS/#")
        print("连接结果:" + mqtt.connack_string(reason_code))
        #连接成功后就订阅topic
        client.subscribe(Sub_topic1)
        client.subscribe(Sub_topic2)

# 从服务器接收发布消息时的回调。
def on_message(client, userdata, message):
    print(str(message.payload,'utf-8'))

#当消息已经被发送给中间人,on_publish()回调将会被触发
def on_publish(client, userdata, mid):
    print(str(mid))

def main():
    passw=get_token(DeviceName,accesskey)
    print(passw)
    mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,DeviceName)
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message
    mqttc.on_subscribe = on_subscribe
    mqttc.on_unsubscribe = on_unsubscribe

    # client = mqtt.Client(DeviceName,protocol=MQTTv311)
    #client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书
    mqttc.connect(ServerUrl, port=ServerPort, keepalive=120)

    mqttc.username_pw_set(Productid,passw)
   
    mqttc.loop_start()

    
    while(1):

        mqttc.publish(Pub_topic1,jsonstr,qos=0)
        print("okk")
        time.sleep(2)

if __name__ == '__main__':
    main()

运行测试一下

在这里插入图片描述

可以看到我们订阅的topic 返回了我们消息的ID 123 说明我们的数据上传成功 ,平台上也可以看到我们的数据流

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1703306.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

左外连接和右外连接的区别?举例说明——以力扣sql 1378. 使用唯一标识码替换员工ID为例

左外连接(LEFT JOIN)和右外连接(RIGHT JOIN)的主要区别在于哪个表的所有行会保留在结果集中 1. 左外连接 (LEFT JOIN) 左外连接会返回左表中的所有行以及右表中符合连接条件的行。如果右表中没有匹配的行,结果集中右…

Livox-SDK2 用vs2017编译

Livox-SDK2 Livox-SDK2代码去上面下载,文章中给出的是用vs2019进行编译的,生成项目时用的 > cmake .. -G "Visual Studio 16 2019" -A x64 但如果我想用vs2017进行编译,那么只需要将上面语句改为如下: cmake .. -…

外贸软件工艺品行业版,解决管理难点,助力降本增效

随着全球经济的不断发展和贸易自由化程度的提高,我国工艺品出口贸易面临着广阔的市场空间和良好的发展机遇。同时,我国工艺品以其独特的文化内涵和精湛的制作工艺,赢得了全球消费者的喜爱和认可,为我国外贸发展作出了重要贡献。 …

【Linux 网络】网络基础(三)(数据链路层协议:以太网协议、ARP 协议)

一、以太网 两个不同局域网的主机传递数据并不是直接传递的,而是通过路由器 “一跳一跳” 的传递过去。 跨网络传输的本质:由无数个局域网(子网)转发的结果。 所以,要理解数据跨网络转发原理就要先理解一个局域网中数…

MagicaCloth2中文文档

提示:经搬运者测试,在ecs1.0中运行最为良好 如何安装 英语日语 目录 [隐藏] 1 如何安装2 样本运行测试3 可以删除示例文件夹4 如何更新5 发生错误时该怎么办6 如何卸载7 如何检查版本 如何安装 MagicaCloth2 需要 Unity 2021.3.16 (LTS&…

对比表征学习(二)Setence Embedding

参考翁莉莲的Blog,本章主要阐述句子嵌入(sentence embedding) 文本扩增 绝大部分视觉应用中的对比方法依赖于创造每个图像的增强版本,但是在句子扩增中会变得非常有挑战性。因为不同于图片,在扩增句子的同时非常容易改…

刷题记录5.22-5.27

文章目录 刷题记录5.22-5.2717.电话号码的字母组合78.子集131.分割回文串77.组合22.括号生成198.打家劫舍---从递归到记忆化搜索再到递推动态规划背包搜索模板494.目标和322.零钱兑换牛客小白月赛---数字合并线性DP1143.最长公共子序列72.编辑距离300.最长递增子序列状态机DP12…

基于 Pre-commit 的 Python项目代码风格统一实践

背景信息 统一代码风格首先需要定义参照的规范,每个团队可能会有自己的规范,我们选择的规范是 yapf mypy isort,如果保证所有的研发人员都遵循相关规范呢? 鼓励 IDE 中对应的插件的安装,通过直接对应的插件&#x…

Java毕业设计 基于springboot vue考勤管理系统

Java毕业设计 基于springboot vue考勤管理系统 SpringBoot 考勤管理系统 功能介绍 员工 登录 个人中心 修改密码 个人信息 员工请假管理 员工出差管理 薪资管理 员工签到管理 公告管理 管理员 登录 个人中心 修改密码 个人信息 员工管理 员工请假管理 员工出差管理 薪资管理…

Linux修炼之路之自动化构建工具,进度条,gdb调试器

目录 一:自动化构建工具make/makefile 生成内容: 清理内容: 对于多过程的: 对于多次make: 特殊符号: 二:小程序之进度条 三:git的简单介绍 四:Linux调试器gdb 接…

2024年蓝桥杯B组C++——复盘

1、握手问题 知识点:模拟 这道题很简单。但是不知道考试的时候有没有写错。一开始的43个人握手,仅需要两两握手,也就是从42个握手开始,而非43.很可惜。这道题没有拿稳这5分。也很有可能是这5分导致没有进决赛。 总结&#xff1a…

调用萨姆索诺夫函数:深入探索函数的参数与返回值

新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、萨姆索诺夫函数的引入与调用 二、如何获取函数的返回值 三、无参数与无返回值的函数调…

06_Tomcat

文章目录 Tomcat1.概念2.Tomcat安装3.Tomcat项目结构4.标准web项目结构5.Tomcat部署项目方式6.IDEA关联Tomcat6.1 构建tomcat和idea关联6.2 使用idea创建一个Javaweb工程6.3 使用idea将工程**构建**成一个app6.4 使用idea将构建好的app**部署**到tomcat中 Tomcat 1.概念 Tomc…

《浪姐》也搞live直播,真成综艺流量密码了?

继《歌手》之后,芒果的另一档综艺《浪姐》也将开启直播。 《乘风2024》官博宣布进行突击加场直播赛,姐姐们将面临全开麦live直播,摇人投票排在前十的姐姐获得live直播抢先权。 这是看《歌手2024》直播赛制火了,也想蹭个热度搞直…

拓展海外市场,助力中国海外运营企业实现全球化发展——工博科技SAP出海数字化解决方案

近年来,在全球化浪潮下,中国出海企业正从简单的产品扩张向更加成熟的跨国经营及全球化发展转变。中资企业要积极拓展国际市场、加大步伐融入全球的生态,打造韧性供应链,但面对风云变幻的国际形势,需要提高自身的风险管…

Java绩效考核系统源码 springboot员工绩效考核系统源码

Java绩效考核系统源码 springboot员工绩效考核系统源码-009 源码下载地址:https://download.csdn.net/download/xiaohua1992/89352195 项目介绍 本系统的功能分为管理员和员工两个角色 管理员的功能有: (1)个人中心管理功能&a…

基于Pytorch框架的深度学习ShufflenetV2神经网络十七种猴子动物识别分类系统源码

第一步:准备数据 17种猴子动物数据: self.class_indict ["白头卷尾猴", "弥猴", "山魈", "松鼠猴", "叶猴", "银色绒猴", "印度乌叶猴", "疣猴", "侏绒"…

DDR、LPDDR和GDDR的区别

1、概况 以DDR开头的内存适用于服务器、云计算、网络、笔记本电脑、台式机和消费类应用,支持更宽的通道宽度、更高的密度和不同的形状尺寸。 以LPDDR开头的内存适合面向移动和汽车这些对规格和功耗非常敏感的领域,提供更窄的通道宽度和多种低功耗运行状态…

node.js学习P3-P10

P3 npm package.json(package解读npm工具换镜像源) 一个package.json文件可以的作用 作为一个描述文件,描述了你的项目依赖哪些包 ,用来干什么的允许我们使用“语义版本规则”,指明你项目依赖的版本让你的构建更好的…

cin-getline缓存区

更多资源请关注纽扣编程微信公众号 cin.sync()清除缓存区 如果需要输入如下内容 3 This is C language. This is JAVA language. This is Python language. 写如下程序 #include<bits/stdc.h> using namespace std; string str[100]; int main(){int n;cin>&…